NIFI-274 Adding @InputRequirement to processors and adding appropriate send and receive provenance events
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4bdd729d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4bdd729d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4bdd729d Branch: refs/heads/NIFI-274 Commit: 4bdd729dc7fbaae2c9b76f742d763155a73918ef Parents: d328ac4 Author: Bryan Bende <[email protected]> Authored: Wed Nov 4 16:09:04 2015 -0500 Committer: Bryan Bende <[email protected]> Committed: Wed Nov 4 16:09:04 2015 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/ListenSyslog.java | 31 +++++++++------- .../nifi/processors/standard/PutSyslog.java | 15 +++++++- .../processors/standard/TestListenSyslog.java | 38 ++++++++++++++++++-- .../nifi/processors/standard/TestPutSyslog.java | 18 ++++++++++ 4 files changed, 87 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/4bdd729d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 5f76beb..320b69e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -69,7 +70,7 @@ import org.apache.nifi.processors.standard.util.SyslogEvent; import org.apache.nifi.processors.standard.util.SyslogParser; import org.apache.nifi.stream.io.ByteArrayOutputStream; - +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @Tags({"syslog", "listen", "udp", "tcp", "logs"}) @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " + "expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " + @@ -114,7 +115,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { .build(); public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() .name("Max Number of TCP Connections") - .description("The maximum number of concurrent connections to accept syslog messages in TCP mode") + .description("The maximum number of concurrent connections to accept Syslog messages in TCP mode.") .addValidator(StandardValidators.createLongValidator(1, 65535, true)) .defaultValue("2") .required(true) @@ -183,14 +184,14 @@ public class ListenSyslog extends AbstractSyslogProcessor { final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final String protocol = context.getProperty(PROTOCOL).getValue(); final String charSet = context.getProperty(CHARSET).getValue(); - final int maxConnections; - + final int maxConnections; + if (protocol.equals(UDP_VALUE)) { maxConnections = 1; } else{ maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue(); } - + parser = new SyslogParser(Charset.forName(charSet)); bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE); syslogEvents = new LinkedBlockingQueue<>(10); @@ -243,6 +244,8 @@ public class ListenSyslog extends AbstractSyslogProcessor { } final SyslogEvent event = initialEvent; + final String port = context.getProperty(PORT).getValue(); + final String protocol = context.getProperty(PROTOCOL).getValue(); final Map<String,String> attributes = new HashMap<>(); attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority()); @@ -254,13 +257,16 @@ public class ListenSyslog extends AbstractSyslogProcessor { attributes.put(SyslogAttributes.SENDER.key(), event.getSender()); attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody()); attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid())); - attributes.put(SyslogAttributes.PROTOCOL.key(), context.getProperty(PROTOCOL).getValue()); - attributes.put(SyslogAttributes.PORT.key(), context.getProperty(PORT).getValue()); + attributes.put(SyslogAttributes.PROTOCOL.key(), protocol); + attributes.put(SyslogAttributes.PORT.key(), port); attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); FlowFile flowFile = session.create(); flowFile = session.putAllAttributes(flowFile, attributes); + final String transitUri = new StringBuilder().append(protocol).append("://").append(event.getSender()) + .append(":").append(port).toString(); + try { // write the raw bytes of the message as the FlowFile content flowFile = session.write(flowFile, new OutputStreamCallback() { @@ -273,6 +279,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { if (event.isValid()) { getLogger().info("Transferring {} to success", new Object[]{flowFile}); session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().receive(flowFile, transitUri); } else { getLogger().info("Transferring {} to invalid", new Object[]{flowFile}); session.transfer(flowFile, REL_INVALID); @@ -464,7 +471,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { IOUtils.closeQuietly(socketChannel); continue; } - logger.debug("Accepted incoming connection from {}", + logger.debug("Accepted incoming connection from {}", new Object[]{socketChannel.getRemoteAddress().toString()} ); // Set socket to non-blocking, and register with selector socketChannel.configureBlocking(false); @@ -478,21 +485,21 @@ public class ListenSyslog extends AbstractSyslogProcessor { // Clear out the operations the select is interested in until done reading key.interestOps(0); // Create and execute the read handler - final SocketChannelHandler handler = new SocketChannelHandler(key, this, + final SocketChannelHandler handler = new SocketChannelHandler(key, this, syslogParser, syslogEvents, logger); // and launch the thread executor.execute(handler); } } } - // Add back all idle sockets to the select + // Add back all idle sockets to the select SelectionKey key; while((key = keyQueue.poll()) != null){ key.interestOps(SelectionKey.OP_READ); } } catch (IOException e) { logger.error("Error accepting connection from SocketChannel", e); - } + } } } @@ -612,7 +619,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { } } // Preserve bytes in buffer for next call to run - // NOTE: This code could benefit from the two ByteBuffer read calls to avoid + // NOTE: This code could benefit from the two ByteBuffer read calls to avoid // this compact for higher throughput socketBuffer.reset(); socketBuffer.compact(); http://git-wip-us.apache.org/repos/asf/nifi/blob/4bdd729d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java index 5e558ca..b7e7a9c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -33,6 +34,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.SyslogParser; import org.apache.nifi.util.ObjectHolder; +import org.apache.nifi.util.StopWatch; import java.io.IOException; import java.net.InetAddress; @@ -52,6 +54,7 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @TriggerWhenEmpty @Tags({"syslog", "put", "udp", "tcp", "logs"}) @CapabilityDescription("Sends Syslog messages to a given host and port over TCP or UDP. Messages are constructed from the \"Message ___\" properties of the processor " + @@ -59,7 +62,7 @@ import java.util.regex.Pattern; "(<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is optional. The constructed messages are checked against regular expressions for " + "RFC5424 and RFC3164 formatted messages. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " + "or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If a message is constructed that does not form a valid Syslog message according to the " + - "above description, then it is routed to the invalid relationship. Valid messages are pushed to Syslog with successes routed to the success relationship, and " + + "above description, then it is routed to the invalid relationship. Valid messages are sent to the Syslog server and successes are routed to the success relationship, " + "failures routed to the failure relationship.") public class PutSyslog extends AbstractSyslogProcessor { @@ -277,9 +280,14 @@ public class PutSyslog extends AbstractSyslogProcessor { } } + final String port = context.getProperty(PORT).getValue(); + final String host = context.getProperty(HOSTNAME).getValue(); + final String transitUri = new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString(); final ObjectHolder<IOException> exceptionHolder = new ObjectHolder<>(null); + try { for (FlowFile flowFile : flowFiles) { + final StopWatch timer = new StopWatch(true); final String priority = context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue(); final String version = context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue(); final String timestamp = context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue(); @@ -304,6 +312,11 @@ public class PutSyslog extends AbstractSyslogProcessor { } sender.send(messageBuilder.toString()); + timer.stop(); + + final long duration = timer.getDuration(TimeUnit.MILLISECONDS); + session.getProvenanceReporter().send(flowFile, transitUri, duration, true); + getLogger().info("Transferring {} to success", new Object[]{flowFile}); session.transfer(flowFile, REL_SUCCESS); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/nifi/blob/4bdd729d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java index 9795545..f0eb345 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -16,12 +16,15 @@ */ package org.apache.nifi.processors.standard; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.io.nio.BufferPool; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.standard.util.SyslogEvent; import org.apache.nifi.processors.standard.util.SyslogParser; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -91,6 +94,15 @@ public class TestListenSyslog { MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue()); + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(numMessages, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0", + event.getTransitUri()); + } finally { // unschedule to close connections proc.onUnscheduled(); @@ -132,6 +144,16 @@ public class TestListenSyslog { MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(numMessages, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0", + event.getTransitUri()); + } finally { // unschedule to close connections proc.onUnscheduled(); @@ -174,6 +196,16 @@ public class TestListenSyslog { MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(numMessages, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0", + event.getTransitUri()); + } finally { // unschedule to close connections proc.onUnscheduled(); @@ -244,7 +276,7 @@ public class TestListenSyslog { } - private void checkFlowFile(MockFlowFile flowFile, int port, String protocol) { + private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) { flowFile.assertContentEquals(VALID_MESSAGE); Assert.assertEquals(PRI, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key())); Assert.assertEquals(SEV, flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key())); @@ -255,6 +287,7 @@ public class TestListenSyslog { Assert.assertEquals("true", flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key())); Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key())); Assert.assertEquals(protocol, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key())); + Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()))); } /** @@ -396,7 +429,8 @@ public class TestListenSyslog { } @Override - protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) { + protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, + final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) { return new ChannelReader() { @Override public void open(int port, int maxBufferSize) throws IOException { http://git-wip-us.apache.org/repos/asf/nifi/blob/4bdd729d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java index 40a9123..eb0d3f4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java @@ -17,6 +17,8 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.processor.Processor; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; @@ -70,6 +72,14 @@ public class TestPutSyslog { runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); Assert.assertEquals(1, sender.messages.size()); Assert.assertEquals(expectedMessage, sender.messages.get(0)); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType()); + Assert.assertEquals("UDP://localhost:12345", event.getTransitUri()); } @Test @@ -95,6 +105,14 @@ public class TestPutSyslog { runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); Assert.assertEquals(1, sender.messages.size()); Assert.assertEquals(expectedMessage, sender.messages.get(0).replace("\n", "")); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.SEND, event.getEventType()); + Assert.assertEquals("TCP://localhost:12345", event.getTransitUri()); } @Test
