NIFI-274 - Fixing TestListenSyslog, fixing default buffer size to be bytes, 
adding syslog.protocol to attributes
         - Adding syslog.port to ListenSyslog attributes, logging at warn level 
when rejecting tcp connections
         - Adding @InputRequirement to processors and adding appropriate send 
and receive provenance events


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/618f22e1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/618f22e1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/618f22e1

Branch: refs/heads/master
Commit: 618f22e110032df09d77a80587b553e6995038e2
Parents: 5611dac
Author: Bryan Bende <[email protected]>
Authored: Wed Nov 4 09:01:52 2015 -0500
Committer: Tony Kurc <[email protected]>
Committed: Wed Nov 4 18:01:59 2015 -0500

----------------------------------------------------------------------
 .../standard/AbstractSyslogProcessor.java       |  4 +-
 .../nifi/processors/standard/ListenSyslog.java  | 36 +++++++++++----
 .../nifi/processors/standard/PutSyslog.java     | 15 +++++-
 .../processors/standard/TestListenSyslog.java   | 48 ++++++++++++++++++--
 .../nifi/processors/standard/TestPutSyslog.java | 18 ++++++++
 5 files changed, 104 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
index f7d5eeb..fea01bd 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
@@ -64,7 +64,9 @@ public abstract class AbstractSyslogProcessor extends 
AbstractProcessor {
         HOSTNAME("syslog.hostname"),
         SENDER("syslog.sender"),
         BODY("syslog.body"),
-        VALID("syslog.valid");
+        VALID("syslog.valid"),
+        PROTOCOL("syslog.protocol"),
+        PORT("syslog.pprt");
 
         private String key;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 8012b88..1660e3a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectionKey;
@@ -45,6 +46,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -68,7 +70,7 @@ import org.apache.nifi.processors.standard.util.SyslogEvent;
 import org.apache.nifi.processors.standard.util.SyslogParser;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 
-
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @Tags({"syslog", "listen", "udp", "tcp", "logs"})
 @CapabilityDescription("Listens for Syslog messages being sent to a given port 
over TCP or UDP. Incoming messages are checked against regular " +
         "expressions for RFC5424 and RFC3164 formatted messages. The format of 
each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " +
@@ -88,6 +90,8 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream;
                     @WritesAttribute(attribute="syslog.body", description="The 
body of the Syslog message, everything after the hostname."),
                     @WritesAttribute(attribute="syslog.valid", description="An 
indicator of whether this message matched the expected formats. " +
                             "If this value is false, the other attributes will 
be empty and only the original message will be available in the content."),
+                    @WritesAttribute(attribute="syslog.protocol", 
description="The protocol over which the Syslog message was received."),
+                    @WritesAttribute(attribute="syslog.port", description="The 
port over which the Syslog message was received."),
                     @WritesAttribute(attribute="mime.type", description="The 
mime.type of the FlowFile which will be text/plain for Syslog messages.")})
 public class ListenSyslog extends AbstractSyslogProcessor {
 
@@ -97,7 +101,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                     "incoming Syslog messages. When UDP is selected each 
buffer will hold one Syslog message. When TCP is selected messages are read " +
                     "from an incoming connection until the buffer is full, or 
the connection is closed. ")
             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("65507 KB")
+            .defaultValue("65507 B")
             .required(true)
             .build();
     public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
@@ -110,8 +114,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             .required(true)
             .build();
     public static final PropertyDescriptor MAX_CONNECTIONS = new 
PropertyDescriptor.Builder()
-            .name("Max number of TCP connections")
-            .description("The maximum number of concurrent connections to 
accept syslog messages in TCP mode")
+            .name("Max Number of TCP Connections")
+            .description("The maximum number of concurrent connections to 
accept Syslog messages in TCP mode.")
             .addValidator(StandardValidators.createLongValidator(1, 65535, 
true))
             .defaultValue("2")
             .required(true)
@@ -142,8 +146,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         descriptors.add(PORT);
         descriptors.add(RECV_BUFFER_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
-        descriptors.add(CHARSET);
         descriptors.add(MAX_CONNECTIONS);
+        descriptors.add(CHARSET);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<>();
@@ -184,7 +188,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         if (protocol.equals(UDP_VALUE.getValue())) {
             maxConnections = 1;
-        } else{
+        } else {
             maxConnections = 
context.getProperty(MAX_CONNECTIONS).asLong().intValue();
         }
 
@@ -240,6 +244,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         }
 
         final SyslogEvent event = initialEvent;
+        final String port = context.getProperty(PORT).getValue();
+        final String protocol = context.getProperty(PROTOCOL).getValue();
 
         final Map<String,String> attributes = new HashMap<>();
         attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
@@ -251,11 +257,16 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
         attributes.put(SyslogAttributes.SENDER.key(), event.getSender());
         attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
         attributes.put(SyslogAttributes.VALID.key(), 
String.valueOf(event.isValid()));
+        attributes.put(SyslogAttributes.PROTOCOL.key(), protocol);
+        attributes.put(SyslogAttributes.PORT.key(), port);
         attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
 
         FlowFile flowFile = session.create();
         flowFile = session.putAllAttributes(flowFile, attributes);
 
+        final String transitUri = new 
StringBuilder().append(protocol).append("://").append(event.getSender())
+                .append(":").append(port).toString();
+
         try {
             // write the raw bytes of the message as the FlowFile content
             flowFile = session.write(flowFile, new OutputStreamCallback() {
@@ -268,6 +279,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             if (event.isValid()) {
                 getLogger().info("Transferring {} to success", new 
Object[]{flowFile});
                 session.transfer(flowFile, REL_SUCCESS);
+                session.getProvenanceReporter().receive(flowFile, transitUri);
             } else {
                 getLogger().info("Transferring {} to invalid", new 
Object[]{flowFile});
                 session.transfer(flowFile, REL_INVALID);
@@ -454,7 +466,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                                 // Check for available connections
                                 if (currentConnections.incrementAndGet() > 
maxConnections){
                                     currentConnections.decrementAndGet();
-                                    logger.info("Rejecting connection from {} 
because max connections has been met", new Object[]{ 
socketChannel.getRemoteAddress().toString() });
+                                    logger.warn("Rejecting connection from {} 
because max connections has been met",
+                                            new Object[]{ 
socketChannel.getRemoteAddress().toString() });
                                     IOUtils.closeQuietly(socketChannel);
                                     continue;
                                 }
@@ -494,8 +507,11 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         public int getPort() {
             // Return the port for the key listening for accepts
             for(SelectionKey key : selector.keys()){
-                if (key.isValid() && key.isAcceptable()) {
-                    return 
((SocketChannel)key.channel()).socket().getLocalPort();
+                if (key.isValid()) {
+                    final Channel channel = key.channel();
+                    if (channel instanceof  ServerSocketChannel) {
+                        return 
((ServerSocketChannel)channel).socket().getLocalPort();
+                    }
                 }
             }
             return 0;
@@ -619,7 +635,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 eof = true;
             } catch (IOException e) {
                 logger.error("Error reading from channel", e);
-             // Treat same as closed socket
+                // Treat same as closed socket
                 eof = true;
             } finally {
                 if(eof == true) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index 5e558ca..b7e7a9c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -33,6 +34,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.SyslogParser;
 import org.apache.nifi.util.ObjectHolder;
+import org.apache.nifi.util.StopWatch;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -52,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @TriggerWhenEmpty
 @Tags({"syslog", "put", "udp", "tcp", "logs"})
 @CapabilityDescription("Sends Syslog messages to a given host and port over 
TCP or UDP. Messages are constructed from the \"Message ___\" properties of the 
processor " +
@@ -59,7 +62,7 @@ import java.util.regex.Pattern;
         "(<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is 
optional.  The constructed messages are checked against regular expressions for 
" +
         "RFC5424 and RFC3164 formatted messages. The timestamp can be an 
RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or 
\"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " +
         "or it can be an RFC3164 timestamp with a format of \"MMM d 
HH:mm:ss\". If a message is constructed that does not form a valid Syslog 
message according to the " +
-        "above description, then it is routed to the invalid relationship. 
Valid messages are pushed to Syslog with successes routed to the success 
relationship, and " +
+        "above description, then it is routed to the invalid relationship. 
Valid messages are sent to the Syslog server and successes are routed to the 
success relationship, " +
         "failures routed to the failure relationship.")
 public class PutSyslog extends AbstractSyslogProcessor {
 
@@ -277,9 +280,14 @@ public class PutSyslog extends AbstractSyslogProcessor {
             }
         }
 
+        final String port = context.getProperty(PORT).getValue();
+        final String host = context.getProperty(HOSTNAME).getValue();
+        final String transitUri = new 
StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
         final ObjectHolder<IOException> exceptionHolder = new 
ObjectHolder<>(null);
+
         try {
             for (FlowFile flowFile : flowFiles) {
+                final StopWatch timer = new StopWatch(true);
                 final String priority = 
context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue();
                 final String version = 
context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue();
                 final String timestamp = 
context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
@@ -304,6 +312,11 @@ public class PutSyslog extends AbstractSyslogProcessor {
                         }
 
                         sender.send(messageBuilder.toString());
+                        timer.stop();
+
+                        final long duration = 
timer.getDuration(TimeUnit.MILLISECONDS);
+                        session.getProvenanceReporter().send(flowFile, 
transitUri, duration, true);
+
                         getLogger().info("Transferring {} to success", new 
Object[]{flowFile});
                         session.transfer(flowFile, REL_SUCCESS);
                     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index eb71f88..f0eb345 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -16,12 +16,15 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.io.nio.BufferPool;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.standard.util.SyslogEvent;
 import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -89,7 +92,16 @@ public class TestListenSyslog {
             Assert.assertEquals("Did not process all the datagrams", 
numMessages, numTransfered);
 
             MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile);
+            checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue());
+
+            final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(numMessages, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
+            Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue() + "://" + 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
+                    event.getTransitUri());
 
         } finally {
             // unschedule to close connections
@@ -131,7 +143,17 @@ public class TestListenSyslog {
             Assert.assertEquals("Did not process all the messages", 
numMessages, numTransfered);
 
             MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile);
+            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
+
+            final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(numMessages, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
+            Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
+                    event.getTransitUri());
+
         } finally {
             // unschedule to close connections
             proc.onUnscheduled();
@@ -143,6 +165,7 @@ public class TestListenSyslog {
         final ListenSyslog proc = new ListenSyslog();
         final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(ListenSyslog.PROTOCOL, 
ListenSyslog.TCP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5");
         runner.setProperty(ListenSyslog.PORT, "0");
 
         // schedule to start listening on a random port
@@ -172,7 +195,17 @@ public class TestListenSyslog {
             Assert.assertEquals("Did not process all the messages", 
numMessages, numTransfered);
 
             MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile);
+            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
+
+            final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(numMessages, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, 
event.getEventType());
+            Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
+                    event.getTransitUri());
+
         } finally {
             // unschedule to close connections
             proc.onUnscheduled();
@@ -210,6 +243,8 @@ public class TestListenSyslog {
                 proc.onTrigger(context, processSessionFactory);
                 numTransfered = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
             }
+
+            // all messages should be transferred to invalid
             Assert.assertEquals("Did not process all the messages", 
numMessages, numTransfered);
 
         } finally {
@@ -241,7 +276,7 @@ public class TestListenSyslog {
     }
 
 
-    private void checkFlowFile(MockFlowFile flowFile) {
+    private void checkFlowFile(final MockFlowFile flowFile, final int port, 
final String protocol) {
         flowFile.assertContentEquals(VALID_MESSAGE);
         Assert.assertEquals(PRI, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key()));
         Assert.assertEquals(SEV, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key()));
@@ -250,6 +285,9 @@ public class TestListenSyslog {
         Assert.assertEquals(HOST, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key()));
         Assert.assertEquals(BODY, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key()));
         Assert.assertEquals("true", 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key()));
+        Assert.assertEquals(String.valueOf(port), 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key()));
+        Assert.assertEquals(protocol, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key()));
+        
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key())));
     }
 
     /**
@@ -392,7 +430,7 @@ public class TestListenSyslog {
 
         @Override
         protected ChannelReader createChannelReader(final String protocol, 
final BufferPool bufferPool, final SyslogParser syslogParser,
-                final BlockingQueue<SyslogEvent> syslogEvents, int 
maxConnections) {
+                                                    final 
BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
             return new ChannelReader() {
                 @Override
                 public void open(int port, int maxBufferSize) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
index 40a9123..eb0d3f4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
@@ -70,6 +72,14 @@ public class TestPutSyslog {
         runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
         Assert.assertEquals(1, sender.messages.size());
         Assert.assertEquals(expectedMessage, sender.messages.get(0));
+
+        final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+        Assert.assertNotNull(events);
+        Assert.assertEquals(1, events.size());
+
+        final ProvenanceEventRecord event = events.get(0);
+        Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType());
+        Assert.assertEquals("UDP://localhost:12345", event.getTransitUri());
     }
 
     @Test
@@ -95,6 +105,14 @@ public class TestPutSyslog {
         runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
         Assert.assertEquals(1, sender.messages.size());
         Assert.assertEquals(expectedMessage, 
sender.messages.get(0).replace("\n", ""));
+
+        final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+        Assert.assertNotNull(events);
+        Assert.assertEquals(1, events.size());
+
+        final ProvenanceEventRecord event = events.get(0);
+        Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType());
+        Assert.assertEquals("TCP://localhost:12345", event.getTransitUri());
     }
 
     @Test

Reply via email to