NIFI-274 - Fixing TestListenSyslog, fixing default buffer size to be bytes,
adding syslog.protocol to attributes
- Adding syslog.port to ListenSyslog attributes, logging at warn level
when rejecting tcp connections
- Adding @InputRequirement to processors and adding appropriate send
and receive provenance events
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/618f22e1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/618f22e1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/618f22e1
Branch: refs/heads/master
Commit: 618f22e110032df09d77a80587b553e6995038e2
Parents: 5611dac
Author: Bryan Bende <[email protected]>
Authored: Wed Nov 4 09:01:52 2015 -0500
Committer: Tony Kurc <[email protected]>
Committed: Wed Nov 4 18:01:59 2015 -0500
----------------------------------------------------------------------
.../standard/AbstractSyslogProcessor.java | 4 +-
.../nifi/processors/standard/ListenSyslog.java | 36 +++++++++++----
.../nifi/processors/standard/PutSyslog.java | 15 +++++-
.../processors/standard/TestListenSyslog.java | 48 ++++++++++++++++++--
.../nifi/processors/standard/TestPutSyslog.java | 18 ++++++++
5 files changed, 104 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
index f7d5eeb..fea01bd 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
@@ -64,7 +64,9 @@ public abstract class AbstractSyslogProcessor extends
AbstractProcessor {
HOSTNAME("syslog.hostname"),
SENDER("syslog.sender"),
BODY("syslog.body"),
- VALID("syslog.valid");
+ VALID("syslog.valid"),
+ PROTOCOL("syslog.protocol"),
+ PORT("syslog.pprt");
private String key;
http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 8012b88..1660e3a 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
@@ -45,6 +46,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -68,7 +70,7 @@ import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
-
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
@CapabilityDescription("Listens for Syslog messages being sent to a given port
over TCP or UDP. Incoming messages are checked against regular " +
"expressions for RFC5424 and RFC3164 formatted messages. The format of
each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " +
@@ -88,6 +90,8 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream;
@WritesAttribute(attribute="syslog.body", description="The
body of the Syslog message, everything after the hostname."),
@WritesAttribute(attribute="syslog.valid", description="An
indicator of whether this message matched the expected formats. " +
"If this value is false, the other attributes will
be empty and only the original message will be available in the content."),
+ @WritesAttribute(attribute="syslog.protocol",
description="The protocol over which the Syslog message was received."),
+ @WritesAttribute(attribute="syslog.port", description="The
port over which the Syslog message was received."),
@WritesAttribute(attribute="mime.type", description="The
mime.type of the FlowFile which will be text/plain for Syslog messages.")})
public class ListenSyslog extends AbstractSyslogProcessor {
@@ -97,7 +101,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
"incoming Syslog messages. When UDP is selected each
buffer will hold one Syslog message. When TCP is selected messages are read " +
"from an incoming connection until the buffer is full, or
the connection is closed. ")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .defaultValue("65507 KB")
+ .defaultValue("65507 B")
.required(true)
.build();
public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new
PropertyDescriptor.Builder()
@@ -110,8 +114,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.required(true)
.build();
public static final PropertyDescriptor MAX_CONNECTIONS = new
PropertyDescriptor.Builder()
- .name("Max number of TCP connections")
- .description("The maximum number of concurrent connections to
accept syslog messages in TCP mode")
+ .name("Max Number of TCP Connections")
+ .description("The maximum number of concurrent connections to
accept Syslog messages in TCP mode.")
.addValidator(StandardValidators.createLongValidator(1, 65535,
true))
.defaultValue("2")
.required(true)
@@ -142,8 +146,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
descriptors.add(PORT);
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
- descriptors.add(CHARSET);
descriptors.add(MAX_CONNECTIONS);
+ descriptors.add(CHARSET);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
@@ -184,7 +188,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
if (protocol.equals(UDP_VALUE.getValue())) {
maxConnections = 1;
- } else{
+ } else {
maxConnections =
context.getProperty(MAX_CONNECTIONS).asLong().intValue();
}
@@ -240,6 +244,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
final SyslogEvent event = initialEvent;
+ final String port = context.getProperty(PORT).getValue();
+ final String protocol = context.getProperty(PROTOCOL).getValue();
final Map<String,String> attributes = new HashMap<>();
attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
@@ -251,11 +257,16 @@ public class ListenSyslog extends AbstractSyslogProcessor
{
attributes.put(SyslogAttributes.SENDER.key(), event.getSender());
attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
attributes.put(SyslogAttributes.VALID.key(),
String.valueOf(event.isValid()));
+ attributes.put(SyslogAttributes.PROTOCOL.key(), protocol);
+ attributes.put(SyslogAttributes.PORT.key(), port);
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
+ final String transitUri = new
StringBuilder().append(protocol).append("://").append(event.getSender())
+ .append(":").append(port).toString();
+
try {
// write the raw bytes of the message as the FlowFile content
flowFile = session.write(flowFile, new OutputStreamCallback() {
@@ -268,6 +279,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
if (event.isValid()) {
getLogger().info("Transferring {} to success", new
Object[]{flowFile});
session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile, transitUri);
} else {
getLogger().info("Transferring {} to invalid", new
Object[]{flowFile});
session.transfer(flowFile, REL_INVALID);
@@ -454,7 +466,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// Check for available connections
if (currentConnections.incrementAndGet() >
maxConnections){
currentConnections.decrementAndGet();
- logger.info("Rejecting connection from {}
because max connections has been met", new Object[]{
socketChannel.getRemoteAddress().toString() });
+ logger.warn("Rejecting connection from {}
because max connections has been met",
+ new Object[]{
socketChannel.getRemoteAddress().toString() });
IOUtils.closeQuietly(socketChannel);
continue;
}
@@ -494,8 +507,11 @@ public class ListenSyslog extends AbstractSyslogProcessor {
public int getPort() {
// Return the port for the key listening for accepts
for(SelectionKey key : selector.keys()){
- if (key.isValid() && key.isAcceptable()) {
- return
((SocketChannel)key.channel()).socket().getLocalPort();
+ if (key.isValid()) {
+ final Channel channel = key.channel();
+ if (channel instanceof ServerSocketChannel) {
+ return
((ServerSocketChannel)channel).socket().getLocalPort();
+ }
}
}
return 0;
@@ -619,7 +635,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
eof = true;
} catch (IOException e) {
logger.error("Error reading from channel", e);
- // Treat same as closed socket
+ // Treat same as closed socket
eof = true;
} finally {
if(eof == true) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index 5e558ca..b7e7a9c 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -33,6 +34,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.util.ObjectHolder;
+import org.apache.nifi.util.StopWatch;
import java.io.IOException;
import java.net.InetAddress;
@@ -52,6 +54,7 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@TriggerWhenEmpty
@Tags({"syslog", "put", "udp", "tcp", "logs"})
@CapabilityDescription("Sends Syslog messages to a given host and port over
TCP or UDP. Messages are constructed from the \"Message ___\" properties of the
processor " +
@@ -59,7 +62,7 @@ import java.util.regex.Pattern;
"(<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is
optional. The constructed messages are checked against regular expressions for
" +
"RFC5424 and RFC3164 formatted messages. The timestamp can be an
RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or
\"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " +
"or it can be an RFC3164 timestamp with a format of \"MMM d
HH:mm:ss\". If a message is constructed that does not form a valid Syslog
message according to the " +
- "above description, then it is routed to the invalid relationship.
Valid messages are pushed to Syslog with successes routed to the success
relationship, and " +
+ "above description, then it is routed to the invalid relationship.
Valid messages are sent to the Syslog server and successes are routed to the
success relationship, " +
"failures routed to the failure relationship.")
public class PutSyslog extends AbstractSyslogProcessor {
@@ -277,9 +280,14 @@ public class PutSyslog extends AbstractSyslogProcessor {
}
}
+ final String port = context.getProperty(PORT).getValue();
+ final String host = context.getProperty(HOSTNAME).getValue();
+ final String transitUri = new
StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
final ObjectHolder<IOException> exceptionHolder = new
ObjectHolder<>(null);
+
try {
for (FlowFile flowFile : flowFiles) {
+ final StopWatch timer = new StopWatch(true);
final String priority =
context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue();
final String version =
context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue();
final String timestamp =
context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
@@ -304,6 +312,11 @@ public class PutSyslog extends AbstractSyslogProcessor {
}
sender.send(messageBuilder.toString());
+ timer.stop();
+
+ final long duration =
timer.getDuration(TimeUnit.MILLISECONDS);
+ session.getProvenanceReporter().send(flowFile,
transitUri, duration, true);
+
getLogger().info("Transferring {} to success", new
Object[]{flowFile});
session.transfer(flowFile, REL_SUCCESS);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index eb71f88..f0eb345 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -16,12 +16,15 @@
*/
package org.apache.nifi.processors.standard;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.io.nio.BufferPool;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -89,7 +92,16 @@ public class TestListenSyslog {
Assert.assertEquals("Did not process all the datagrams",
numMessages, numTransfered);
MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
- checkFlowFile(flowFile);
+ checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue());
+
+ final List<ProvenanceEventRecord> events =
runner.getProvenanceEvents();
+ Assert.assertNotNull(events);
+ Assert.assertEquals(numMessages, events.size());
+
+ final ProvenanceEventRecord event = events.get(0);
+ Assert.assertEquals(ProvenanceEventType.RECEIVE,
event.getEventType());
+ Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue() + "://" +
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
+ event.getTransitUri());
} finally {
// unschedule to close connections
@@ -131,7 +143,17 @@ public class TestListenSyslog {
Assert.assertEquals("Did not process all the messages",
numMessages, numTransfered);
MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
- checkFlowFile(flowFile);
+ checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
+
+ final List<ProvenanceEventRecord> events =
runner.getProvenanceEvents();
+ Assert.assertNotNull(events);
+ Assert.assertEquals(numMessages, events.size());
+
+ final ProvenanceEventRecord event = events.get(0);
+ Assert.assertEquals(ProvenanceEventType.RECEIVE,
event.getEventType());
+ Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" +
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
+ event.getTransitUri());
+
} finally {
// unschedule to close connections
proc.onUnscheduled();
@@ -143,6 +165,7 @@ public class TestListenSyslog {
final ListenSyslog proc = new ListenSyslog();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenSyslog.PROTOCOL,
ListenSyslog.TCP_VALUE.getValue());
+ runner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5");
runner.setProperty(ListenSyslog.PORT, "0");
// schedule to start listening on a random port
@@ -172,7 +195,17 @@ public class TestListenSyslog {
Assert.assertEquals("Did not process all the messages",
numMessages, numTransfered);
MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
- checkFlowFile(flowFile);
+ checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
+
+ final List<ProvenanceEventRecord> events =
runner.getProvenanceEvents();
+ Assert.assertNotNull(events);
+ Assert.assertEquals(numMessages, events.size());
+
+ final ProvenanceEventRecord event = events.get(0);
+ Assert.assertEquals(ProvenanceEventType.RECEIVE,
event.getEventType());
+ Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" +
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0",
+ event.getTransitUri());
+
} finally {
// unschedule to close connections
proc.onUnscheduled();
@@ -210,6 +243,8 @@ public class TestListenSyslog {
proc.onTrigger(context, processSessionFactory);
numTransfered =
runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
}
+
+ // all messages should be transferred to invalid
Assert.assertEquals("Did not process all the messages",
numMessages, numTransfered);
} finally {
@@ -241,7 +276,7 @@ public class TestListenSyslog {
}
- private void checkFlowFile(MockFlowFile flowFile) {
+ private void checkFlowFile(final MockFlowFile flowFile, final int port,
final String protocol) {
flowFile.assertContentEquals(VALID_MESSAGE);
Assert.assertEquals(PRI,
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key()));
Assert.assertEquals(SEV,
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key()));
@@ -250,6 +285,9 @@ public class TestListenSyslog {
Assert.assertEquals(HOST,
flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key()));
Assert.assertEquals(BODY,
flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key()));
Assert.assertEquals("true",
flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key()));
+ Assert.assertEquals(String.valueOf(port),
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key()));
+ Assert.assertEquals(protocol,
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key()));
+
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key())));
}
/**
@@ -392,7 +430,7 @@ public class TestListenSyslog {
@Override
protected ChannelReader createChannelReader(final String protocol,
final BufferPool bufferPool, final SyslogParser syslogParser,
- final BlockingQueue<SyslogEvent> syslogEvents, int
maxConnections) {
+ final
BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
return new ChannelReader() {
@Override
public void open(int port, int maxBufferSize) throws
IOException {
http://git-wip-us.apache.org/repos/asf/nifi/blob/618f22e1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
index 40a9123..eb0d3f4 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
@@ -17,6 +17,8 @@
package org.apache.nifi.processors.standard;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
@@ -70,6 +72,14 @@ public class TestPutSyslog {
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
Assert.assertEquals(1, sender.messages.size());
Assert.assertEquals(expectedMessage, sender.messages.get(0));
+
+ final List<ProvenanceEventRecord> events =
runner.getProvenanceEvents();
+ Assert.assertNotNull(events);
+ Assert.assertEquals(1, events.size());
+
+ final ProvenanceEventRecord event = events.get(0);
+ Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType());
+ Assert.assertEquals("UDP://localhost:12345", event.getTransitUri());
}
@Test
@@ -95,6 +105,14 @@ public class TestPutSyslog {
runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
Assert.assertEquals(1, sender.messages.size());
Assert.assertEquals(expectedMessage,
sender.messages.get(0).replace("\n", ""));
+
+ final List<ProvenanceEventRecord> events =
runner.getProvenanceEvents();
+ Assert.assertNotNull(events);
+ Assert.assertEquals(1, events.size());
+
+ final ProvenanceEventRecord event = events.get(0);
+ Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType());
+ Assert.assertEquals("TCP://localhost:12345", event.getTransitUri());
}
@Test