Repository: nifi Updated Branches: refs/heads/master 6776060ac -> 19e53962c
NIFI-1579 Performance improvements for ListenSyslog which include removing an unnecessary yield and exposing a configurable size for the internal queue used by the processor, changing ListenSyslog to use a 20ms poll and use a long poll when batching, also including same improvements for ListenRELP Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/19e53962 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/19e53962 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/19e53962 Branch: refs/heads/master Commit: 19e53962ca45fd00a46efd671b674d388ff10053 Parents: 6776060 Author: Bryan Bende <[email protected]> Authored: Tue Mar 1 12:54:52 2016 -0500 Committer: Bryan Bende <[email protected]> Committed: Fri Mar 4 09:17:45 2016 -0500 ---------------------------------------------------------------------- .../listen/AbstractListenEventProcessor.java | 25 ++++++++++---- .../nifi/processors/standard/ListenRELP.java | 6 ++-- .../nifi/processors/standard/ListenSyslog.java | 34 +++++++++++++++----- .../processors/standard/TestListenSyslog.java | 5 +-- 4 files changed, 52 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/19e53962/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java index 029f6db..d56255d 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java @@ -85,6 +85,15 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst .defaultValue("1 MB") .required(true) .build(); + public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder() + .name("Max Size of Message Queue") + .description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " + + "Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total " + + "memory used by the processor.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10000") + .required(true) + .build(); // Putting these properties here so sub-classes don't have to redefine them, but they are // not added to the properties by default since not all processors may need them @@ -119,7 +128,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst .description("Messages received successfully will be sent out this relationship.") .build(); - public static final int POLL_TIMEOUT_MS = 100; + public static final int POLL_TIMEOUT_MS = 20; private Set<Relationship> relationships; private List<PropertyDescriptor> descriptors; @@ -127,7 +136,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst protected volatile int port; protected volatile Charset charset; protected volatile ChannelDispatcher dispatcher; - protected volatile BlockingQueue<E> events = new LinkedBlockingQueue<>(10); + protected volatile BlockingQueue<E> events; protected volatile BlockingQueue<E> errorEvents = new LinkedBlockingQueue<>(); @Override @@ -135,6 +144,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst final List<PropertyDescriptor> descriptors = new ArrayList<>(); descriptors.add(PORT); descriptors.add(RECV_BUFFER_SIZE); + descriptors.add(MAX_MESSAGE_QUEUE_SIZE); descriptors.add(MAX_SOCKET_BUFFER_SIZE); descriptors.add(CHARSET); descriptors.addAll(getAdditionalProperties()); @@ -178,6 +188,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst public void onScheduled(final ProcessContext context) throws IOException { charset = Charset.forName(context.getProperty(CHARSET).getValue()); port = context.getProperty(PORT).asInteger(); + events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger()); final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); @@ -230,7 +241,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst * * @return an event from one of the queues, or null if none are available */ - protected E getMessage(final boolean longPoll, final boolean pollErrorQueue) { + protected E getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) { E event = null; if (pollErrorQueue) { event = errorEvents.poll(); @@ -249,6 +260,10 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst } } + if (event != null) { + session.adjustCounter("Messages Received", 1L, false); + } + return event; } @@ -270,7 +285,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst final Map<String,FlowFileEventBatch> batches = new HashMap<>(); for (int i=0; i < totalBatchSize; i++) { - final E event = getMessage(true, true); + final E event = getMessage(true, true, session); if (event == null) { break; } @@ -311,8 +326,6 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst errorEvents.offer(event); break; } - - session.adjustCounter("Messages Received", 1L, false); } return batches; http://git-wip-us.apache.org/repos/asf/nifi/blob/19e53962/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java index 99e1830..c386bdc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java @@ -133,9 +133,10 @@ public class ListenRELP extends AbstractListenEventProcessor<RELPEvent> { final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger(); final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize, messageDemarcatorBytes); - // if the size is 0 then there was nothing to process so yield and return + // if the size is 0 then there was nothing to process so return + // we don't need to yield here because inside getBatches() we are polling a queue with a wait + // and yielding here could have a negative impact on performance if (batches.size() == 0) { - context.yield(); return; } @@ -170,6 +171,7 @@ public class ListenRELP extends AbstractListenEventProcessor<RELPEvent> { getLogger().debug("Transferring {} to success", new Object[] {flowFile}); session.transfer(flowFile, REL_SUCCESS); + session.adjustCounter("FlowFiles Transferred to Success", 1L, false); // create a provenance receive event final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; http://git-wip-us.apache.org/repos/asf/nifi/blob/19e53962/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 8ca7eb1..1ec406d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -98,6 +98,15 @@ import java.util.concurrent.TimeUnit; @SeeAlso({PutSyslog.class, ParseSyslog.class}) public class ListenSyslog extends AbstractSyslogProcessor { + public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder() + .name("Max Size of Message Queue") + .description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " + + "Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total " + + "memory used by the processor.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10000") + .required(true) + .build(); public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder() .name("Receive Buffer Size") .description("The size of each buffer used to receive Syslog messages. Adjust this value appropriately based on the expected size of the " + @@ -171,7 +180,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { private volatile ChannelDispatcher channelDispatcher; private volatile SyslogParser parser; private volatile BlockingQueue<ByteBuffer> bufferPool; - private volatile BlockingQueue<RawSyslogEvent> syslogEvents = new LinkedBlockingQueue<>(10); + private volatile BlockingQueue<RawSyslogEvent> syslogEvents; private volatile BlockingQueue<RawSyslogEvent> errorEvents = new LinkedBlockingQueue<>(); private volatile byte[] messageDemarcatorBytes; //it is only the array reference that is volatile - not the contents. @@ -182,6 +191,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { descriptors.add(PORT); descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(RECV_BUFFER_SIZE); + descriptors.add(MAX_MESSAGE_QUEUE_SIZE); descriptors.add(MAX_SOCKET_BUFFER_SIZE); descriptors.add(MAX_CONNECTIONS); descriptors.add(MAX_BATCH_SIZE); @@ -245,6 +255,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { final int port = context.getProperty(PORT).asInteger(); final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + final int maxMessageQueueSize = context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger(); final String protocol = context.getProperty(PROTOCOL).getValue(); final String charSet = context.getProperty(CHARSET).getValue(); final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); @@ -263,6 +274,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { } parser = new SyslogParser(Charset.forName(charSet)); + syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize); // create either a UDP or TCP reader and call open() to bind to the given port final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); @@ -313,7 +325,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { } } - protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue) { + protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) { RawSyslogEvent rawSyslogEvent = null; if (pollErrorQueue) { rawSyslogEvent = errorEvents.poll(); @@ -322,7 +334,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { if (rawSyslogEvent == null) { try { if (longPoll) { - rawSyslogEvent = syslogEvents.poll(100, TimeUnit.MILLISECONDS); + rawSyslogEvent = syslogEvents.poll(20, TimeUnit.MILLISECONDS); } else { rawSyslogEvent = syslogEvents.poll(); } @@ -332,6 +344,10 @@ public class ListenSyslog extends AbstractSyslogProcessor { } } + if (rawSyslogEvent != null) { + session.adjustCounter("Messages Received", 1L, false); + } + return rawSyslogEvent; } @@ -342,11 +358,12 @@ public class ListenSyslog extends AbstractSyslogProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { // poll the queue with a small timeout to avoid unnecessarily yielding below - RawSyslogEvent rawSyslogEvent = getMessage(true, true); + RawSyslogEvent rawSyslogEvent = getMessage(true, true, session); - // if nothing in the queue then yield and return + // if nothing in the queue just return, we don't want to yield here because yielding could adversely + // impact performance, and we already have a long poll in getMessage so there will be some built in + // throttling even when no data is available if (rawSyslogEvent == null) { - context.yield(); return; } @@ -372,7 +389,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { // If this is our first iteration, we have already polled our queues. Otherwise, poll on each iteration. if (i > 0) { - rawSyslogEvent = getMessage(false, false); + rawSyslogEvent = getMessage(true, false, session); if (rawSyslogEvent == null) { break; @@ -461,7 +478,6 @@ public class ListenSyslog extends AbstractSyslogProcessor { break; } - session.adjustCounter("Messages Received", 1L, false); flowFilePerSender.put(sender, flowFile); } @@ -483,6 +499,8 @@ public class ListenSyslog extends AbstractSyslogProcessor { getLogger().debug("Transferring {} to success", new Object[] {flowFile}); session.transfer(flowFile, REL_SUCCESS); + session.adjustCounter("FlowFiles Transferred to Success", 1L, false); + final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; final String transitUri = new StringBuilder().append(protocol.toLowerCase()).append("://").append(senderHost).append(":").append(port).toString(); session.getProvenanceReporter().receive(flowFile, transitUri); http://git-wip-us.apache.org/repos/asf/nifi/blob/19e53962/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java index 360dfe7..cd8621c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.ProcessException; @@ -602,11 +603,11 @@ public class TestListenSyslog { } @Override - protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue) { + protected RawSyslogEvent getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) { if (eventItr.hasNext()) { return eventItr.next(); } - return super.getMessage(longPoll, pollErrorQueue); + return super.getMessage(longPoll, pollErrorQueue, session); } } }
