NIFI-274 Adding @InputRequirement to processors and adding appropriate send and 
receive provenance events


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4bdd729d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4bdd729d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4bdd729d

Branch: refs/heads/NIFI-274
Commit: 4bdd729dc7fbaae2c9b76f742d763155a73918ef
Parents: d328ac4
Author: Bryan Bende <[email protected]>
Authored: Wed Nov 4 16:09:04 2015 -0500
Committer: Bryan Bende <[email protected]>
Committed: Wed Nov 4 16:09:04 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenSyslog.java  | 31 +++++++++-------
 .../nifi/processors/standard/PutSyslog.java     | 15 +++++++-
 .../processors/standard/TestListenSyslog.java   | 38 ++++++++++++++++++--
 .../nifi/processors/standard/TestPutSyslog.java | 18 ++++++++++
 4 files changed, 87 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4bdd729d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 5f76beb..320b69e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -69,7 +70,7 @@ import org.apache.nifi.processors.standard.util.SyslogEvent;
 import org.apache.nifi.processors.standard.util.SyslogParser;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 
-
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @Tags({"syslog", "listen", "udp", "tcp", "logs"})
 @CapabilityDescription("Listens for Syslog messages being sent to a given port 
over TCP or UDP. Incoming messages are checked against regular " +
         "expressions for RFC5424 and RFC3164 formatted messages. The format of 
each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " +
@@ -114,7 +115,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             .build();
     public static final PropertyDescriptor MAX_CONNECTIONS = new 
PropertyDescriptor.Builder()
             .name("Max Number of TCP Connections")
-            .description("The maximum number of concurrent connections to 
accept syslog messages in TCP mode")
+            .description("The maximum number of concurrent connections to 
accept Syslog messages in TCP mode.")
             .addValidator(StandardValidators.createLongValidator(1, 65535, 
true))
             .defaultValue("2")
             .required(true)
@@ -183,14 +184,14 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
         final int maxChannelBufferSize = 
context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final String protocol = context.getProperty(PROTOCOL).getValue();
         final String charSet = context.getProperty(CHARSET).getValue();
-        final int maxConnections; 
-        
+        final int maxConnections;
+
         if (protocol.equals(UDP_VALUE)) {
             maxConnections = 1;
         } else{
             maxConnections = 
context.getProperty(MAX_CONNECTIONS).asLong().intValue();
         }
-        
+
         parser = new SyslogParser(Charset.forName(charSet));
         bufferPool = new BufferPool(maxConnections, bufferSize, false, 
Integer.MAX_VALUE);
         syslogEvents = new LinkedBlockingQueue<>(10);
@@ -243,6 +244,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         }
 
         final SyslogEvent event = initialEvent;
+        final String port = context.getProperty(PORT).getValue();
+        final String protocol = context.getProperty(PROTOCOL).getValue();
 
         final Map<String,String> attributes = new HashMap<>();
         attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
@@ -254,13 +257,16 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
         attributes.put(SyslogAttributes.SENDER.key(), event.getSender());
         attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
         attributes.put(SyslogAttributes.VALID.key(), 
String.valueOf(event.isValid()));
-        attributes.put(SyslogAttributes.PROTOCOL.key(), 
context.getProperty(PROTOCOL).getValue());
-        attributes.put(SyslogAttributes.PORT.key(), 
context.getProperty(PORT).getValue());
+        attributes.put(SyslogAttributes.PROTOCOL.key(), protocol);
+        attributes.put(SyslogAttributes.PORT.key(), port);
         attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
 
         FlowFile flowFile = session.create();
         flowFile = session.putAllAttributes(flowFile, attributes);
 
+        final String transitUri = new 
StringBuilder().append(protocol).append("://").append(event.getSender())
+                .append(":").append(port).toString();
+
         try {
             // write the raw bytes of the message as the FlowFile content
             flowFile = session.write(flowFile, new OutputStreamCallback() {
@@ -273,6 +279,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             if (event.isValid()) {
                 getLogger().info("Transferring {} to success", new 
Object[]{flowFile});
                 session.transfer(flowFile, REL_SUCCESS);
+                session.getProvenanceReporter().receive(flowFile, transitUri);
             } else {
                 getLogger().info("Transferring {} to invalid", new 
Object[]{flowFile});
                 session.transfer(flowFile, REL_INVALID);
@@ -464,7 +471,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                                     IOUtils.closeQuietly(socketChannel);
                                     continue;
                                 }
-                                logger.debug("Accepted incoming connection 
from {}", 
+                                logger.debug("Accepted incoming connection 
from {}",
                                         new 
Object[]{socketChannel.getRemoteAddress().toString()} );
                                 // Set socket to non-blocking, and register 
with selector
                                 socketChannel.configureBlocking(false);
@@ -478,21 +485,21 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
                                 // Clear out the operations the select is 
interested in until done reading
                                 key.interestOps(0);
                                 // Create and execute the read handler
-                                final SocketChannelHandler handler = new 
SocketChannelHandler(key, this, 
+                                final SocketChannelHandler handler = new 
SocketChannelHandler(key, this,
                                         syslogParser, syslogEvents, logger);
                                 // and launch the thread
                                 executor.execute(handler);
                             }
                         }
                     }
-                    // Add back all idle sockets to the select 
+                    // Add back all idle sockets to the select
                     SelectionKey key;
                     while((key = keyQueue.poll()) != null){
                         key.interestOps(SelectionKey.OP_READ);
                     }
                 } catch (IOException e) {
                     logger.error("Error accepting connection from 
SocketChannel", e);
-                } 
+                }
             }
         }
 
@@ -612,7 +619,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                         }
                     }
                     // Preserve bytes in buffer for next call to run
-                    // NOTE: This code could benefit from the  two ByteBuffer 
read calls to avoid 
+                    // NOTE: This code could benefit from the  two ByteBuffer 
read calls to avoid
                     //  this compact for higher throughput
                     socketBuffer.reset();
                     socketBuffer.compact();

http://git-wip-us.apache.org/repos/asf/nifi/blob/4bdd729d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index 5e558ca..b7e7a9c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -33,6 +34,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.SyslogParser;
 import org.apache.nifi.util.ObjectHolder;
+import org.apache.nifi.util.StopWatch;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -52,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @TriggerWhenEmpty
 @Tags({"syslog", "put", "udp", "tcp", "logs"})
 @CapabilityDescription("Sends Syslog messages to a given host and port over 
TCP or UDP. Messages are constructed from the \"Message ___\" properties of the 
processor " +
@@ -59,7 +62,7 @@ import java.util.regex.Pattern;
         "(<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is 
optional.  The constructed messages are checked against regular expressions for 
" +
         "RFC5424 and RFC3164 formatted messages. The timestamp can be an 
RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or 
\"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " +
         "or it can be an RFC3164 timestamp with a format of \"MMM d 
HH:mm:ss\". If a message is constructed that does not form a valid Syslog 
message according to the " +
-        "above description, then it is routed to the invalid relationship. 
Valid messages are pushed to Syslog with successes routed to the success 
relationship, and " +
+        "above description, then it is routed to the invalid relationship. 
Valid messages are sent to the Syslog server and successes are routed to the 
success relationship, " +
         "failures routed to the failure relationship.")
 public class PutSyslog extends AbstractSyslogProcessor {
 
@@ -277,9 +280,14 @@ public class PutSyslog extends AbstractSyslogProcessor {
             }
         }
 
+        final String port = context.getProperty(PORT).getValue();
+        final String host = context.getProperty(HOSTNAME).getValue();
+        final String transitUri = new 
StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
         final ObjectHolder<IOException> exceptionHolder = new 
ObjectHolder<>(null);
+
         try {
             for (FlowFile flowFile : flowFiles) {
+                final StopWatch timer = new StopWatch(true);
                 final String priority = 
context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue();
                 final String version = 
context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue();
                 final String timestamp = 
context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
@@ -304,6 +312,11 @@ public class PutSyslog extends AbstractSyslogProcessor {
                         }
 
                         sender.send(messageBuilder.toString());
+                        timer.stop();
+
+                        final long duration = 
timer.getDuration(TimeUnit.MILLISECONDS);
+                        session.getProvenanceReporter().send(flowFile, 
transitUri, duration, true);
+
                         getLogger().info("Transferring {} to success", new 
Object[]{flowFile});
                         session.transfer(flowFile, REL_SUCCESS);
                     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/4bdd729d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 9795545..f0eb345 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -16,12 +16,15 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.io.nio.BufferPool;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.standard.util.SyslogEvent;
 import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -91,6 +94,15 @@ public class TestListenSyslog {
             MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
             checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue());
 
+            final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(numMessages, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
+            Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue() + "://" + 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
+                    event.getTransitUri());
+
         } finally {
             // unschedule to close connections
             proc.onUnscheduled();
@@ -132,6 +144,16 @@ public class TestListenSyslog {
 
             MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
             checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
+
+            final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(numMessages, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
+            Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
+                    event.getTransitUri());
+
         } finally {
             // unschedule to close connections
             proc.onUnscheduled();
@@ -174,6 +196,16 @@ public class TestListenSyslog {
 
             MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
             checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
+
+            final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(numMessages, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
+            Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
+                    event.getTransitUri());
+
         } finally {
             // unschedule to close connections
             proc.onUnscheduled();
@@ -244,7 +276,7 @@ public class TestListenSyslog {
     }
 
 
-    private void checkFlowFile(MockFlowFile flowFile, int port, String 
protocol) {
+    private void checkFlowFile(final MockFlowFile flowFile, final int port, 
final String protocol) {
         flowFile.assertContentEquals(VALID_MESSAGE);
         Assert.assertEquals(PRI, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key()));
         Assert.assertEquals(SEV, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key()));
@@ -255,6 +287,7 @@ public class TestListenSyslog {
         Assert.assertEquals("true", 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key()));
         Assert.assertEquals(String.valueOf(port), 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key()));
         Assert.assertEquals(protocol, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key()));
+        
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key())));
     }
 
     /**
@@ -396,7 +429,8 @@ public class TestListenSyslog {
         }
 
         @Override
-        protected ChannelReader createChannelReader(final String protocol, 
final BufferPool bufferPool, final SyslogParser syslogParser, final 
BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
+        protected ChannelReader createChannelReader(final String protocol, 
final BufferPool bufferPool, final SyslogParser syslogParser,
+                                                    final 
BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
             return new ChannelReader() {
                 @Override
                 public void open(int port, int maxBufferSize) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/4bdd729d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
index 40a9123..eb0d3f4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
@@ -70,6 +72,14 @@ public class TestPutSyslog {
         runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
         Assert.assertEquals(1, sender.messages.size());
         Assert.assertEquals(expectedMessage, sender.messages.get(0));
+
+        final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+        Assert.assertNotNull(events);
+        Assert.assertEquals(1, events.size());
+
+        final ProvenanceEventRecord event = events.get(0);
+        Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType());
+        Assert.assertEquals("UDP://localhost:12345", event.getTransitUri());
     }
 
     @Test
@@ -95,6 +105,14 @@ public class TestPutSyslog {
         runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
         Assert.assertEquals(1, sender.messages.size());
         Assert.assertEquals(expectedMessage, 
sender.messages.get(0).replace("\n", ""));
+
+        final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+        Assert.assertNotNull(events);
+        Assert.assertEquals(1, events.size());
+
+        final ProvenanceEventRecord event = events.get(0);
+        Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType());
+        Assert.assertEquals("TCP://localhost:12345", event.getTransitUri());
     }
 
     @Test

Reply via email to