Repository: nifi Updated Branches: refs/heads/master c59087bc3 -> e5281f1fc
NIFI-1221: Support batching of Syslog messages Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e5281f1f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e5281f1f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e5281f1f Branch: refs/heads/master Commit: e5281f1fc1865c653f8e2147622229d55c7d9ab1 Parents: c59087b Author: Mark Payne <[email protected]> Authored: Wed Nov 25 17:21:00 2015 -0500 Committer: Bryan Bende <[email protected]> Committed: Mon Nov 30 17:32:49 2015 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/ListenSyslog.java | 397 ++++++++++++++----- .../nifi/processors/standard/ParseSyslog.java | 150 +++++++ .../nifi/processors/standard/PutSyslog.java | 2 + .../org.apache.nifi.processor.Processor | 1 + .../processors/standard/TestListenSyslog.java | 227 +++++++---- .../processors/standard/TestParseSyslog.java | 61 +++ 6 files changed, 669 insertions(+), 169 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e5281f1f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index effaffc..fbe64ea 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -31,6 +31,7 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -47,16 +48,19 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.io.nio.BufferPool; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; @@ -70,6 +74,7 @@ import org.apache.nifi.processors.standard.util.SyslogEvent; import org.apache.nifi.processors.standard.util.SyslogParser; import org.apache.nifi.stream.io.ByteArrayOutputStream; +@SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @Tags({"syslog", "listen", "udp", "tcp", "logs"}) @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " + @@ -92,7 +97,8 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream; "If this value is false, the other attributes will be empty and only the original message will be available in the content."), @WritesAttribute(attribute="syslog.protocol", description="The protocol over which the Syslog message was received."), @WritesAttribute(attribute="syslog.port", description="The port over which the Syslog message was received."), - @WritesAttribute(attribute="mime.type", description="The mime.type of the FlowFile which will be text/plain for Syslog messages.")}) + @WritesAttribute(attribute = "mime.type", description = "The mime.type of the FlowFile which will be text/plain for Syslog messages.")}) +@SeeAlso({PutSyslog.class, ParseSyslog.class}) public class ListenSyslog extends AbstractSyslogProcessor { public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder() @@ -120,6 +126,31 @@ public class ListenSyslog extends AbstractSyslogProcessor { .defaultValue("2") .required(true) .build(); + public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Max Batch Size") + .description( + "The maximum number of Syslog events to add to a single FlowFile. If multiple events are available, they will be concatenated along with " + + "the <Message Delimiter> up to this configured maximum number of messages") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1") + .required(true) + .build(); + public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() + .name("Message Delimiter") + .description("Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("\\n") + .required(true) + .build(); + public static final PropertyDescriptor PARSE_MESSAGES = new PropertyDescriptor.Builder() + .name("Parse Messages") + .description("Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only " + + "contain the sender, protocol, and port, and no additional attributes.") + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -133,11 +164,12 @@ public class ListenSyslog extends AbstractSyslogProcessor { private Set<Relationship> relationships; private List<PropertyDescriptor> descriptors; - private volatile BufferPool bufferPool; private volatile ChannelReader channelReader; private volatile SyslogParser parser; - private volatile BlockingQueue<SyslogEvent> syslogEvents; - private volatile BlockingQueue<SyslogEvent> errorEvents; + private volatile BlockingQueue<ByteBuffer> bufferPool; + private volatile BlockingQueue<RawSyslogEvent> syslogEvents = new LinkedBlockingQueue<>(10); + private volatile BlockingQueue<RawSyslogEvent> errorEvents = new LinkedBlockingQueue<>(); + private volatile byte[] messageDemarcatorBytes; //it is only the array reference that is volatile - not the contents. @Override protected void init(final ProcessorInitializationContext context) { @@ -147,6 +179,9 @@ public class ListenSyslog extends AbstractSyslogProcessor { descriptors.add(RECV_BUFFER_SIZE); descriptors.add(MAX_SOCKET_BUFFER_SIZE); descriptors.add(MAX_CONNECTIONS); + descriptors.add(MAX_BATCH_SIZE); + descriptors.add(MESSAGE_DELIMITER); + descriptors.add(PARSE_MESSAGES); descriptors.add(CHARSET); this.descriptors = Collections.unmodifiableList(descriptors); @@ -162,19 +197,32 @@ public class ListenSyslog extends AbstractSyslogProcessor { } @Override - public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { return descriptors; } @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { - // since properties were changed, clear any events that were queued - if (syslogEvents != null) { - syslogEvents.clear(); + // if we are changing the protocol, the events that we may have queued up are no longer valid, as they + // were received using a different protocol and may be from a completely different source + if (PROTOCOL.equals(descriptor)) { + if (syslogEvents != null) { + syslogEvents.clear(); + } + if (errorEvents != null) { + errorEvents.clear(); + } } - if (errorEvents != null) { - errorEvents.clear(); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(); + if (validationContext.getProperty(MAX_BATCH_SIZE).asInteger() > 1 && validationContext.getProperty(PARSE_MESSAGES).asBoolean()) { + results.add(new ValidationResult.Builder().subject("Parse Messages").input("true").valid(false) + .explanation("Cannot set Parse Messages to 'true' if Batch Size is greater than 1").build()); } + return results; } @OnScheduled @@ -184,21 +232,26 @@ public class ListenSyslog extends AbstractSyslogProcessor { final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final String protocol = context.getProperty(PROTOCOL).getValue(); final String charSet = context.getProperty(CHARSET).getValue(); - final int maxConnections; + final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); + final String charsetName = context.getProperty(CHARSET).getValue(); + messageDemarcatorBytes = msgDemarcator.getBytes(Charset.forName(charsetName)); + final int maxConnections; if (protocol.equals(UDP_VALUE.getValue())) { maxConnections = 1; } else { maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue(); } + bufferPool = new LinkedBlockingQueue<>(maxConnections); + for (int i = 0; i < maxConnections; i++) { + bufferPool.offer(ByteBuffer.allocate(bufferSize)); + } + parser = new SyslogParser(Charset.forName(charSet)); - bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE); - syslogEvents = new LinkedBlockingQueue<>(10); - errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); // create either a UDP or TCP reader and call open() to bind to the given port - channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents, maxConnections); + channelReader = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections); channelReader.open(port, maxChannelBufferSize); final Thread readerThread = new Thread(channelReader); @@ -207,13 +260,19 @@ public class ListenSyslog extends AbstractSyslogProcessor { readerThread.start(); } + // visible for testing. + protected SyslogParser getParser() { + return parser; + } + // visible for testing to be overridden and provide a mock ChannelReader if desired - protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) + protected ChannelReader createChannelReader(final String protocol, final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents, + int maxConnections) throws IOException { if (protocol.equals(UDP_VALUE.getValue())) { - return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger()); + return new DatagramChannelReader(bufferPool, syslogEvents, getLogger()); } else { - return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger(), maxConnections); + return new SocketChannelReader(bufferPool, syslogEvents, getLogger(), maxConnections); } } @@ -230,66 +289,179 @@ public class ListenSyslog extends AbstractSyslogProcessor { } } + protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue) { + RawSyslogEvent rawSyslogEvent = null; + if (pollErrorQueue) { + rawSyslogEvent = errorEvents.poll(); + } + + if (rawSyslogEvent == null) { + try { + if (longPoll) { + rawSyslogEvent = syslogEvents.poll(100, TimeUnit.MILLISECONDS); + } else { + rawSyslogEvent = syslogEvents.poll(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } + + return rawSyslogEvent; + } + + protected int getErrorQueueSize() { + return errorEvents.size(); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - // try to pull from the error queue first, if empty then pull from main queue - SyslogEvent initialEvent = errorEvents.poll(); - if (initialEvent == null) { - initialEvent = syslogEvents.poll(); - } + // poll the queue with a small timeout to avoid unnecessarily yielding below + RawSyslogEvent rawSyslogEvent = getMessage(true, true); - // if nothing in either queue then yield and return - if (initialEvent == null) { + // if nothing in the queue then yield and return + if (rawSyslogEvent == null) { context.yield(); return; } - final SyslogEvent event = initialEvent; + final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger(); + final String port = context.getProperty(PORT).getValue(); final String protocol = context.getProperty(PROTOCOL).getValue(); - final Map<String,String> attributes = new HashMap<>(); - attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority()); - attributes.put(SyslogAttributes.SEVERITY.key(), event.getSeverity()); - attributes.put(SyslogAttributes.FACILITY.key(), event.getFacility()); - attributes.put(SyslogAttributes.VERSION.key(), event.getVersion()); - attributes.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp()); - attributes.put(SyslogAttributes.HOSTNAME.key(), event.getHostName()); - attributes.put(SyslogAttributes.SENDER.key(), event.getSender()); - attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody()); - attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid())); - attributes.put(SyslogAttributes.PROTOCOL.key(), protocol); - attributes.put(SyslogAttributes.PORT.key(), port); - attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, attributes); - - final String transitUri = new StringBuilder().append(protocol).append("://").append(event.getSender()) - .append(":").append(port).toString(); - - try { - // write the raw bytes of the message as the FlowFile content - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - out.write(event.getRawMessage()); + final Map<String, String> defaultAttributes = new HashMap<>(4); + defaultAttributes.put(SyslogAttributes.PROTOCOL.key(), protocol); + defaultAttributes.put(SyslogAttributes.PORT.key(), port); + defaultAttributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + + + final int numAttributes = SyslogAttributes.values().length + 2; + final boolean shouldParse = context.getProperty(PARSE_MESSAGES).asBoolean(); + + final Map<String, FlowFile> flowFilePerSender = new HashMap<>(); + final SyslogParser parser = getParser(); + + for (int i = 0; i < maxBatchSize; i++) { + SyslogEvent event = null; + + // If this is our first iteration, we have already polled our queues. Otherwise, poll on each iteration. + if (i > 0) { + rawSyslogEvent = getMessage(false, false); + + if (rawSyslogEvent == null) { + break; } - }); - - if (event.isValid()) { - getLogger().info("Transferring {} to success", new Object[]{flowFile}); - session.transfer(flowFile, REL_SUCCESS); - session.getProvenanceReporter().receive(flowFile, transitUri); - } else { - getLogger().info("Transferring {} to invalid", new Object[]{flowFile}); - session.transfer(flowFile, REL_INVALID); } - } catch (ProcessException e) { - getLogger().error("Error processing Syslog message", e); - errorEvents.offer(event); - session.remove(flowFile); + final String sender = rawSyslogEvent.getSender(); + FlowFile flowFile = flowFilePerSender.get(sender); + if (flowFile == null) { + flowFile = session.create(); + flowFilePerSender.put(sender, flowFile); + } + + if (shouldParse) { + boolean valid = true; + try { + event = parser.parseEvent(rawSyslogEvent.getRawMessage(), sender); + } catch (final ProcessException pe) { + getLogger().warn("Failed to parse Syslog event; routing to invalid"); + valid = false; + } + + // If the event is invalid, route it to 'invalid' and then stop. + // We create a separate FlowFile for this case instead of using 'flowFile', + // because the 'flowFile' object may already have data written to it. + if (!valid || !event.isValid()) { + FlowFile invalidFlowFile = session.create(); + invalidFlowFile = session.putAllAttributes(invalidFlowFile, defaultAttributes); + if (sender != null) { + invalidFlowFile = session.putAttribute(invalidFlowFile, SyslogAttributes.SENDER.key(), sender); + } + + try { + final byte[] rawBytes = rawSyslogEvent.getRawMessage(); + invalidFlowFile = session.write(invalidFlowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(rawBytes); + } + }); + } catch (final Exception e) { + getLogger().error("Failed to write contents of Syslog message to FlowFile due to {}; will re-queue message and try again", e); + errorEvents.offer(rawSyslogEvent); + session.remove(invalidFlowFile); + break; + } + + session.transfer(invalidFlowFile, REL_INVALID); + break; + } + + getLogger().trace(event.getFullMessage()); + + final Map<String, String> attributes = new HashMap<>(numAttributes); + attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority()); + attributes.put(SyslogAttributes.SEVERITY.key(), event.getSeverity()); + attributes.put(SyslogAttributes.FACILITY.key(), event.getFacility()); + attributes.put(SyslogAttributes.VERSION.key(), event.getVersion()); + attributes.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp()); + attributes.put(SyslogAttributes.HOSTNAME.key(), event.getHostName()); + attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody()); + attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid())); + + flowFile = session.putAllAttributes(flowFile, attributes); + } + + // figure out if we should write the bytes from the raw event or parsed event + final boolean writeDemarcator = (i > 0); + + try { + // write the raw bytes of the message as the FlowFile content + final byte[] rawMessage = (event == null) ? rawSyslogEvent.getRawMessage() : event.getRawMessage(); + flowFile = session.append(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + if (writeDemarcator) { + out.write(messageDemarcatorBytes); + } + + out.write(rawMessage); + } + }); + } catch (final Exception e) { + getLogger().error("Failed to write contents of Syslog message to FlowFile due to {}; will re-queue message and try again", e); + errorEvents.offer(rawSyslogEvent); + break; + } + + session.adjustCounter("Messages Received", 1L, false); + flowFilePerSender.put(sender, flowFile); + } + + + for (final Map.Entry<String, FlowFile> entry : flowFilePerSender.entrySet()) { + final String sender = entry.getKey(); + FlowFile flowFile = entry.getValue(); + + if (flowFile.getSize() == 0L) { + session.remove(flowFile); + getLogger().debug("No data written to FlowFile from Sender {}; removing FlowFile", new Object[] {sender}); + continue; + } + + final Map<String, String> newAttributes = new HashMap<>(defaultAttributes.size() + 1); + newAttributes.putAll(defaultAttributes); + newAttributes.put(SyslogAttributes.SENDER.key(), sender); + flowFile = session.putAllAttributes(flowFile, newAttributes); + + getLogger().debug("Transferring {} to success", new Object[] {flowFile}); + session.transfer(flowFile, REL_SUCCESS); + final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; + final String transitUri = new StringBuilder().append(protocol.toLowerCase()).append("://").append(senderHost).append(":").append(port).toString(); + session.getProvenanceReporter().receive(flowFile, transitUri); } } @@ -313,18 +485,15 @@ public class ListenSyslog extends AbstractSyslogProcessor { */ public static class DatagramChannelReader implements ChannelReader { - private final BufferPool bufferPool; - private final SyslogParser syslogParser; - private final BlockingQueue<SyslogEvent> syslogEvents; + private final BlockingQueue<ByteBuffer> bufferPool; + private final BlockingQueue<RawSyslogEvent> syslogEvents; private final ProcessorLog logger; private DatagramChannel datagramChannel; private volatile boolean stopped = false; private Selector selector; - public DatagramChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, - final ProcessorLog logger) { + public DatagramChannelReader(final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger) { this.bufferPool = bufferPool; - this.syslogParser = syslogParser; this.syslogEvents = syslogEvents; this.logger = logger; } @@ -360,17 +529,22 @@ public class ListenSyslog extends AbstractSyslogProcessor { continue; } DatagramChannel channel = (DatagramChannel) key.channel(); - SocketAddress sender; + SocketAddress socketAddress; buffer.clear(); - while (!stopped && (sender = channel.receive(buffer)) != null) { - final SyslogEvent event; - if (sender instanceof InetSocketAddress) { - event = syslogParser.parseEvent(buffer, ((InetSocketAddress)sender).getAddress().toString()); - } else { - event = syslogParser.parseEvent(buffer); + while (!stopped && (socketAddress = channel.receive(buffer)) != null) { + String sender = ""; + if (socketAddress instanceof InetSocketAddress) { + sender = ((InetSocketAddress) socketAddress).getAddress().toString(); } - logger.trace(event.getFullMessage()); - syslogEvents.put(event); // block until space is available + + // create a byte array from the buffer + buffer.flip(); + byte bytes[] = new byte[buffer.limit()]; + buffer.get(bytes, 0, buffer.limit()); + + // queue the raw message with the sender, block until space is available + syslogEvents.put(new RawSyslogEvent(bytes, sender)); + buffer.clear(); } } } @@ -380,8 +554,13 @@ public class ListenSyslog extends AbstractSyslogProcessor { logger.error("Error reading from DatagramChannel", e); } } + if (buffer != null) { - bufferPool.returnBuffer(buffer, 0); + try { + bufferPool.put(buffer); + } catch (InterruptedException e) { + // nothing to do here + } } } @@ -409,9 +588,8 @@ public class ListenSyslog extends AbstractSyslogProcessor { */ public static class SocketChannelReader implements ChannelReader { - private final BufferPool bufferPool; - private final SyslogParser syslogParser; - private final BlockingQueue<SyslogEvent> syslogEvents; + private final BlockingQueue<ByteBuffer> bufferPool; + private final BlockingQueue<RawSyslogEvent> syslogEvents; private final ProcessorLog logger; private final ExecutorService executor; private volatile boolean stopped = false; @@ -420,10 +598,8 @@ public class ListenSyslog extends AbstractSyslogProcessor { private final int maxConnections; private final AtomicInteger currentConnections = new AtomicInteger(0); - public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, - final ProcessorLog logger, final int maxConnections) { + public SocketChannelReader(final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger, final int maxConnections) { this.bufferPool = bufferPool; - this.syslogParser = syslogParser; this.syslogEvents = syslogEvents; this.logger = logger; this.maxConnections = maxConnections; @@ -486,8 +662,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { // Clear out the operations the select is interested in until done reading key.interestOps(0); // Create and execute the read handler - final SocketChannelHandler handler = new SocketChannelHandler(key, this, - syslogParser, syslogEvents, logger); + final SocketChannelHandler handler = new SocketChannelHandler(key, this, syslogEvents, logger); // and launch the thread executor.execute(handler); } @@ -546,7 +721,11 @@ public class ListenSyslog extends AbstractSyslogProcessor { public void completeConnection(SelectionKey key) { // connection is done. Return the buffer to the pool - bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0); + try { + bufferPool.put((ByteBuffer) key.attachment()); + } catch (InterruptedException e) { + // nothing to do here + } currentConnections.decrementAndGet(); } @@ -565,16 +744,13 @@ public class ListenSyslog extends AbstractSyslogProcessor { private final SelectionKey key; private final SocketChannelReader dispatcher; - private final SyslogParser syslogParser; - private final BlockingQueue<SyslogEvent> syslogEvents; + private final BlockingQueue<RawSyslogEvent> syslogEvents; private final ProcessorLog logger; private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); - public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final SyslogParser syslogParser, - final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) { + public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger) { this.key = key; this.dispatcher = dispatcher; - this.syslogParser = syslogParser; this.syslogEvents = syslogEvents; this.logger = logger; } @@ -609,11 +785,9 @@ public class ListenSyslog extends AbstractSyslogProcessor { // check if at end of a message if (currByte == '\n') { - // parse an event, reset the buffer - final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(), - socketChannel.socket().getInetAddress().toString()); - logger.trace(event.getFullMessage()); - syslogEvents.put(event); // block until space is available + String sender = socketChannel.socket().getInetAddress().toString(); + // queue the raw event blocking until space is available, reset the buffer + syslogEvents.put(new RawSyslogEvent(currBytes.toByteArray(), sender)); currBytes.reset(); // Mark this as the start of the next message socketBuffer.mark(); @@ -655,4 +829,25 @@ public class ListenSyslog extends AbstractSyslogProcessor { + "maximum receive buffer"); } + // Wrapper class to pass around the raw message and the host/ip that sent it + public static class RawSyslogEvent { + + final byte[] rawMessage; + final String sender; + + public RawSyslogEvent(byte[] rawMessage, String sender) { + this.rawMessage = rawMessage; + this.sender = sender; + } + + public byte[] getRawMessage() { + return this.rawMessage; + } + + public String getSender() { + return this.sender; + } + + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/e5281f1f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java new file mode 100644 index 0000000..1490cc2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.AbstractSyslogProcessor.SyslogAttributes; +import org.apache.nifi.processors.standard.util.SyslogEvent; +import org.apache.nifi.processors.standard.util.SyslogParser; +import org.apache.nifi.stream.io.StreamUtils; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"logs", "syslog", "attributes", "system", "event", "message"}) +@CapabilityDescription("Parses the contents of a Syslog message and adds attributes to the FlowFile for each of the parts of the Syslog message") +@WritesAttributes({@WritesAttribute(attribute = "syslog.priority", description = "The priority of the Syslog message."), + @WritesAttribute(attribute = "syslog.severity", description = "The severity of the Syslog message derived from the priority."), + @WritesAttribute(attribute = "syslog.facility", description = "The facility of the Syslog message derived from the priority."), + @WritesAttribute(attribute = "syslog.version", description = "The optional version from the Syslog message."), + @WritesAttribute(attribute = "syslog.timestamp", description = "The timestamp of the Syslog message."), + @WritesAttribute(attribute = "syslog.hostname", description = "The hostname of the Syslog message."), + @WritesAttribute(attribute = "syslog.sender", description = "The hostname of the Syslog server that sent the message."), + @WritesAttribute(attribute = "syslog.body", description = "The body of the Syslog message, everything after the hostname.")}) +@SeeAlso({ListenSyslog.class, PutSyslog.class}) +public class ParseSyslog extends AbstractProcessor { + + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies which character set of the Syslog messages") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that could not be parsed as a Syslog message will be transferred to this Relationship without any attributes being added") + .build(); + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully parsed as a Syslog message will be to this Relationship.") + .build(); + + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(1); + properties.add(CHARSET); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_FAILURE); + relationships.add(REL_SUCCESS); + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String charsetName = context.getProperty(CHARSET).getValue(); + final SyslogParser parser = new SyslogParser(Charset.forName(charsetName)); + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + final SyslogEvent event; + try { + event = parser.parseEvent(buffer, null); + } catch (final ProcessException pe) { + getLogger().error("Failed to parse {} as a Syslog message due to {}; routing to failure", new Object[] {flowFile, pe}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + if (!event.isValid()) { + getLogger().error("Failed to parse {} as a Syslog message: it does not conform to any of the RFC formats supported; routing to failure", new Object[] {flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final Map<String, String> attributes = new HashMap<>(8); + attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority()); + attributes.put(SyslogAttributes.SEVERITY.key(), event.getSeverity()); + attributes.put(SyslogAttributes.FACILITY.key(), event.getFacility()); + attributes.put(SyslogAttributes.VERSION.key(), event.getVersion()); + attributes.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp()); + attributes.put(SyslogAttributes.HOSTNAME.key(), event.getHostName()); + attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody()); + + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e5281f1f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java index 733c113..9cb6508 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java @@ -20,6 +20,7 @@ import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; @@ -64,6 +65,7 @@ import java.util.regex.Pattern; "or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If a message is constructed that does not form a valid Syslog message according to the " + "above description, then it is routed to the invalid relationship. Valid messages are sent to the Syslog server and successes are routed to the success relationship, " + "failures routed to the failure relationship.") +@SeeAlso({ListenSyslog.class, ParseSyslog.class}) public class PutSyslog extends AbstractSyslogProcessor { public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/e5281f1f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 56265a9..befa5e7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -53,6 +53,7 @@ org.apache.nifi.processors.standard.LogAttribute org.apache.nifi.processors.standard.MergeContent org.apache.nifi.processors.standard.ModifyBytes org.apache.nifi.processors.standard.MonitorActivity +org.apache.nifi.processors.standard.ParseSyslog org.apache.nifi.processors.standard.PostHTTP org.apache.nifi.processors.standard.PutEmail org.apache.nifi.processors.standard.PutDistributedMapCache http://git-wip-us.apache.org/repos/asf/nifi/blob/e5281f1f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java index f0eb345..d9dc8f0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -16,34 +16,40 @@ */ package org.apache.nifi.processors.standard; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.io.nio.BufferPool; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.standard.ListenSyslog.RawSyslogEvent; import org.apache.nifi.processors.standard.util.SyslogEvent; import org.apache.nifi.processors.standard.util.SyslogParser; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.IntegerHolder; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.BlockingQueue; - public class TestListenSyslog { static final Logger LOGGER = LoggerFactory.getLogger(TestListenSyslog.class); @@ -100,8 +106,7 @@ public class TestListenSyslog { final ProvenanceEventRecord event = events.get(0); Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); - Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0", - event.getTransitUri()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("udp")); } finally { // unschedule to close connections @@ -151,8 +156,7 @@ public class TestListenSyslog { final ProvenanceEventRecord event = events.get(0); Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); - Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0", - event.getTransitUri()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); } finally { // unschedule to close connections @@ -203,8 +207,7 @@ public class TestListenSyslog { final ProvenanceEventRecord event = events.get(0); Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); - Assert.assertEquals(ListenSyslog.TCP_VALUE.getValue() + "://" + flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()) + ":0", - event.getTransitUri()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp")); } finally { // unschedule to close connections @@ -213,6 +216,57 @@ public class TestListenSyslog { } @Test + public void testBatching() throws IOException, InterruptedException { + final ListenSyslog proc = new ListenSyslog(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + runner.setProperty(ListenSyslog.MAX_BATCH_SIZE, "25"); + runner.setProperty(ListenSyslog.MESSAGE_DELIMITER, "|"); + runner.setProperty(ListenSyslog.PARSE_MESSAGES, "false"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + final int numMessages = 20; + final int port = proc.getPort(); + Assert.assertTrue(port > 0); + + // write some UDP messages to the port in the background + final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE.replaceAll("\\n", ""))); + sender.setDaemon(true); + sender.start(); + sender.join(); + + try { + proc.onTrigger(context, processSessionFactory); + runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); + Assert.assertEquals("0", flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key())); + Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue(), flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key())); + Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()))); + + final String content = new String(flowFile.toByteArray(), StandardCharsets.UTF_8); + final String[] splits = content.split("\\|"); + Assert.assertEquals(20, splits.length); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("udp")); + } finally { + // unschedule to close connections + proc.onUnscheduled(); + } + } + + @Test public void testInvalid() throws IOException, InterruptedException { final ListenSyslog proc = new ListenSyslog(); final TestRunner runner = TestRunners.newTestRunner(proc); @@ -254,25 +308,69 @@ public class TestListenSyslog { } @Test - public void testErrorQueue() { - final SyslogEvent event1 = Mockito.mock(SyslogEvent.class); - Mockito.when(event1.getRawMessage()).thenThrow(new ProcessException("ERROR")); - - final SyslogEvent event2 = new SyslogEvent.Builder() - .facility("fac").severity("sev") - .fullMessage("abc").hostname("host") - .msgBody("body").timestamp("123").valid(true) - .rawMessage("abc".getBytes(Charset.forName("UTF-8"))) - .build(); - - final MockProcessor proc = new MockProcessor(Arrays.asList(event1, event2)); + public void testParsingError() throws IOException { + final FailParseProcessor proc = new FailParseProcessor(); final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ListenSyslog.PORT, "12345"); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + try { + final int port = proc.getPort(); + final DatagramSender sender = new DatagramSender(port, 1, 1, INVALID_MESSAGE); + sender.run(); + + // should keep re-processing event1 from the error queue + proc.onTrigger(context, processSessionFactory); + runner.assertTransferCount(ListenSyslog.REL_INVALID, 1); + runner.assertTransferCount(ListenSyslog.REL_SUCCESS, 0); + } finally { + proc.onUnscheduled(); + } + } + + @Test + public void testErrorQueue() throws IOException { + final List<RawSyslogEvent> msgs = new ArrayList<>(); + msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01")); + msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01")); + + // Add message that will throw a FlowFileAccessException the first time that we attempt to read + // the contents but will succeeed the second time. + final IntegerHolder getMessageAttempts = new IntegerHolder(0); + msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01") { + @Override + public byte[] getRawMessage() { + final int attempts = getMessageAttempts.incrementAndGet(); + if (attempts == 1) { + throw new FlowFileAccessException("Unit test failure"); + } else { + return VALID_MESSAGE.getBytes(); + } + } + }); - // should keep re-processing event1 from the error queue - runner.run(3); - runner.assertTransferCount(ListenSyslog.REL_INVALID, 0); - runner.assertTransferCount(ListenSyslog.REL_SUCCESS, 0); + final CannedMessageProcessor proc = new CannedMessageProcessor(msgs); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.MAX_BATCH_SIZE, "5"); + runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue()); + runner.setProperty(ListenSyslog.PORT, "0"); + runner.setProperty(ListenSyslog.PARSE_MESSAGES, "false"); + + runner.run(); + assertEquals(1, proc.getErrorQueueSize()); + runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0).assertContentEquals(VALID_MESSAGE + "\n" + VALID_MESSAGE); + + // running again should pull from the error queue + runner.clearTransferState(); + runner.run(); + runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0).assertContentEquals(VALID_MESSAGE); } @@ -420,46 +518,39 @@ public class TestListenSyslog { } // A mock version of ListenSyslog that will queue the provided events - private static class MockProcessor extends ListenSyslog { - - private List<SyslogEvent> eventList; - - public MockProcessor(List<SyslogEvent> eventList) { - this.eventList = eventList; - } - + private static class FailParseProcessor extends ListenSyslog { @Override - protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, - final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) { - return new ChannelReader() { - @Override - public void open(int port, int maxBufferSize) throws IOException { - - } - + protected SyslogParser getParser() { + return new SyslogParser(StandardCharsets.UTF_8) { @Override - public int getPort() { - return 0; + public SyslogEvent parseEvent(byte[] bytes, String sender) { + throw new ProcessException("Unit test intentionally failing"); } + }; + } + } - @Override - public void stop() { - - } + private static class CannedMessageProcessor extends ListenSyslog { + private final Iterator<RawSyslogEvent> eventItr; - @Override - public void close() { + public CannedMessageProcessor(final List<RawSyslogEvent> events) { + this.eventItr = events.iterator(); + } - } + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.remove(PORT); + properties.add(new PropertyDescriptor.Builder().name(PORT.getName()).addValidator(Validator.VALID).build()); + return properties; + } - @Override - public void run() { - for (SyslogEvent event : eventList) { - syslogEvents.offer(event); - } - } - }; + @Override + protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue) { + if (eventItr.hasNext()) { + return eventItr.next(); + } + return super.getMessage(longPoll, pollErrorQueue); } } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/e5281f1f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java new file mode 100644 index 0000000..a1a4d04 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.nifi.processors.standard.AbstractSyslogProcessor.SyslogAttributes; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestParseSyslog { + static final String PRI = "34"; + static final String SEV = "2"; + static final String FAC = "4"; + static final String TIME = "Oct 13 15:43:23"; + static final String HOST = "localhost.home"; + static final String BODY = "some message"; + + static final String VALID_MESSAGE_RFC3164_0 = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n"; + + @Test + public void testSuccessfulParse3164() { + final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog()); + runner.enqueue(VALID_MESSAGE_RFC3164_0.getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ParseSyslog.REL_SUCCESS, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(ParseSyslog.REL_SUCCESS).get(0); + mff.assertAttributeEquals(SyslogAttributes.BODY.key(), BODY); + mff.assertAttributeEquals(SyslogAttributes.FACILITY.key(), FAC); + mff.assertAttributeEquals(SyslogAttributes.HOSTNAME.key(), HOST); + mff.assertAttributeEquals(SyslogAttributes.PRIORITY.key(), PRI); + mff.assertAttributeEquals(SyslogAttributes.SEVERITY.key(), SEV); + mff.assertAttributeEquals(SyslogAttributes.TIMESTAMP.key(), TIME); + } + + + @Test + public void testInvalidMessage() { + final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog()); + runner.enqueue("<hello> yesterday localhost\n".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ParseSyslog.REL_FAILURE, 1); + } +}
