Repository: nifi
Updated Branches:
  refs/heads/master 6776060ac -> 19e53962c


NIFI-1579 Performance improvements for ListenSyslog which include removing an 
unnecessary yield and exposing a configurable size for the internal queue used 
by the processor, changing ListenSyslog to use a 20ms poll and use a long poll 
when batching, also including same improvements for ListenRELP


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/19e53962
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/19e53962
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/19e53962

Branch: refs/heads/master
Commit: 19e53962ca45fd00a46efd671b674d388ff10053
Parents: 6776060
Author: Bryan Bende <[email protected]>
Authored: Tue Mar 1 12:54:52 2016 -0500
Committer: Bryan Bende <[email protected]>
Committed: Fri Mar 4 09:17:45 2016 -0500

----------------------------------------------------------------------
 .../listen/AbstractListenEventProcessor.java    | 25 ++++++++++----
 .../nifi/processors/standard/ListenRELP.java    |  6 ++--
 .../nifi/processors/standard/ListenSyslog.java  | 34 +++++++++++++++-----
 .../processors/standard/TestListenSyslog.java   |  5 +--
 4 files changed, 52 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/19e53962/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
index 029f6db..d56255d 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
@@ -85,6 +85,15 @@ public abstract class AbstractListenEventProcessor<E extends 
Event> extends Abst
             .defaultValue("1 MB")
             .required(true)
             .build();
+    public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Size of Message Queue")
+            .description("The maximum size of the internal queue used to 
buffer messages being transferred from the underlying channel to the processor. 
" +
+                    "Setting this value higher allows more messages to be 
buffered in memory during surges of incoming messages, but increases the total 
" +
+                    "memory used by the processor.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("10000")
+            .required(true)
+            .build();
 
     // Putting these properties here so sub-classes don't have to redefine 
them, but they are
     // not added to the properties by default since not all processors may 
need them
@@ -119,7 +128,7 @@ public abstract class AbstractListenEventProcessor<E 
extends Event> extends Abst
             .description("Messages received successfully will be sent out this 
relationship.")
             .build();
 
-    public static final int POLL_TIMEOUT_MS = 100;
+    public static final int POLL_TIMEOUT_MS = 20;
 
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> descriptors;
@@ -127,7 +136,7 @@ public abstract class AbstractListenEventProcessor<E 
extends Event> extends Abst
     protected volatile int port;
     protected volatile Charset charset;
     protected volatile ChannelDispatcher dispatcher;
-    protected volatile BlockingQueue<E> events = new LinkedBlockingQueue<>(10);
+    protected volatile BlockingQueue<E> events;
     protected volatile BlockingQueue<E> errorEvents = new 
LinkedBlockingQueue<>();
 
     @Override
@@ -135,6 +144,7 @@ public abstract class AbstractListenEventProcessor<E 
extends Event> extends Abst
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(PORT);
         descriptors.add(RECV_BUFFER_SIZE);
+        descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
         descriptors.add(CHARSET);
         descriptors.addAll(getAdditionalProperties());
@@ -178,6 +188,7 @@ public abstract class AbstractListenEventProcessor<E 
extends Event> extends Abst
     public void onScheduled(final ProcessContext context) throws IOException {
         charset = Charset.forName(context.getProperty(CHARSET).getValue());
         port = context.getProperty(PORT).asInteger();
+        events = new 
LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger());
 
         final int maxChannelBufferSize = 
context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
 
@@ -230,7 +241,7 @@ public abstract class AbstractListenEventProcessor<E 
extends Event> extends Abst
      *
      * @return an event from one of the queues, or null if none are available
      */
-    protected E getMessage(final boolean longPoll, final boolean 
pollErrorQueue) {
+    protected E getMessage(final boolean longPoll, final boolean 
pollErrorQueue, final ProcessSession session) {
         E event = null;
         if (pollErrorQueue) {
             event = errorEvents.poll();
@@ -249,6 +260,10 @@ public abstract class AbstractListenEventProcessor<E 
extends Event> extends Abst
             }
         }
 
+        if (event != null) {
+            session.adjustCounter("Messages Received", 1L, false);
+        }
+
         return event;
     }
 
@@ -270,7 +285,7 @@ public abstract class AbstractListenEventProcessor<E 
extends Event> extends Abst
 
         final Map<String,FlowFileEventBatch> batches = new HashMap<>();
         for (int i=0; i < totalBatchSize; i++) {
-            final E event = getMessage(true, true);
+            final E event = getMessage(true, true, session);
             if (event == null) {
                 break;
             }
@@ -311,8 +326,6 @@ public abstract class AbstractListenEventProcessor<E 
extends Event> extends Abst
                 errorEvents.offer(event);
                 break;
             }
-
-            session.adjustCounter("Messages Received", 1L, false);
         }
 
         return batches;

http://git-wip-us.apache.org/repos/asf/nifi/blob/19e53962/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
index 99e1830..c386bdc 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
@@ -133,9 +133,10 @@ public class ListenRELP extends 
AbstractListenEventProcessor<RELPEvent> {
         final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
         final Map<String,FlowFileEventBatch> batches = getBatches(session, 
maxBatchSize, messageDemarcatorBytes);
 
-        // if the size is 0 then there was nothing to process so yield and 
return
+        // if the size is 0 then there was nothing to process so return
+        // we don't need to yield here because inside getBatches() we are 
polling a queue with a wait
+        // and yielding here could have a negative impact on performance
         if (batches.size() == 0) {
-            context.yield();
             return;
         }
 
@@ -170,6 +171,7 @@ public class ListenRELP extends 
AbstractListenEventProcessor<RELPEvent> {
 
             getLogger().debug("Transferring {} to success", new Object[] 
{flowFile});
             session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, 
false);
 
             // create a provenance receive event
             final String senderHost = sender.startsWith("/") && 
sender.length() > 1 ? sender.substring(1) : sender;

http://git-wip-us.apache.org/repos/asf/nifi/blob/19e53962/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 8ca7eb1..1ec406d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -98,6 +98,15 @@ import java.util.concurrent.TimeUnit;
 @SeeAlso({PutSyslog.class, ParseSyslog.class})
 public class ListenSyslog extends AbstractSyslogProcessor {
 
+    public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Size of Message Queue")
+            .description("The maximum size of the internal queue used to 
buffer messages being transferred from the underlying channel to the processor. 
" +
+                    "Setting this value higher allows more messages to be 
buffered in memory during surges of incoming messages, but increases the total 
" +
+                    "memory used by the processor.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("10000")
+            .required(true)
+            .build();
     public static final PropertyDescriptor RECV_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
             .name("Receive Buffer Size")
             .description("The size of each buffer used to receive Syslog 
messages. Adjust this value appropriately based on the expected size of the " +
@@ -171,7 +180,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
     private volatile ChannelDispatcher channelDispatcher;
     private volatile SyslogParser parser;
     private volatile BlockingQueue<ByteBuffer> bufferPool;
-    private volatile BlockingQueue<RawSyslogEvent> syslogEvents = new 
LinkedBlockingQueue<>(10);
+    private volatile BlockingQueue<RawSyslogEvent> syslogEvents;
     private volatile BlockingQueue<RawSyslogEvent> errorEvents = new 
LinkedBlockingQueue<>();
     private volatile byte[] messageDemarcatorBytes; //it is only the array 
reference that is volatile - not the contents.
 
@@ -182,6 +191,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         descriptors.add(PORT);
         descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(RECV_BUFFER_SIZE);
+        descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
         descriptors.add(MAX_CONNECTIONS);
         descriptors.add(MAX_BATCH_SIZE);
@@ -245,6 +255,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         final int port = context.getProperty(PORT).asInteger();
         final int bufferSize = 
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final int maxChannelBufferSize = 
context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final int maxMessageQueueSize = 
context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger();
         final String protocol = context.getProperty(PROTOCOL).getValue();
         final String charSet = context.getProperty(CHARSET).getValue();
         final String msgDemarcator = 
context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
@@ -263,6 +274,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         }
 
         parser = new SyslogParser(Charset.forName(charSet));
+        syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize);
 
         // create either a UDP or TCP reader and call open() to bind to the 
given port
         final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
@@ -313,7 +325,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         }
     }
 
-    protected RawSyslogEvent getMessage(final boolean longPoll, final boolean 
pollErrorQueue) {
+    protected RawSyslogEvent getMessage(final boolean longPoll, final boolean 
pollErrorQueue, final ProcessSession session) {
         RawSyslogEvent rawSyslogEvent = null;
         if (pollErrorQueue) {
             rawSyslogEvent = errorEvents.poll();
@@ -322,7 +334,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         if (rawSyslogEvent == null) {
             try {
                 if (longPoll) {
-                    rawSyslogEvent = syslogEvents.poll(100, 
TimeUnit.MILLISECONDS);
+                    rawSyslogEvent = syslogEvents.poll(20, 
TimeUnit.MILLISECONDS);
                 } else {
                     rawSyslogEvent = syslogEvents.poll();
                 }
@@ -332,6 +344,10 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             }
         }
 
+        if (rawSyslogEvent != null) {
+            session.adjustCounter("Messages Received", 1L, false);
+        }
+
         return rawSyslogEvent;
     }
 
@@ -342,11 +358,12 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         // poll the queue with a small timeout to avoid unnecessarily yielding 
below
-        RawSyslogEvent rawSyslogEvent = getMessage(true, true);
+        RawSyslogEvent rawSyslogEvent = getMessage(true, true, session);
 
-        // if nothing in the queue then yield and return
+        // if nothing in the queue just return, we don't want to yield here 
because yielding could adversely
+        // impact performance, and we already have a long poll in getMessage 
so there will be some built in
+        // throttling even when no data is available
         if (rawSyslogEvent == null) {
-            context.yield();
             return;
         }
 
@@ -372,7 +389,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
             // If this is our first iteration, we have already polled our 
queues. Otherwise, poll on each iteration.
             if (i > 0) {
-                rawSyslogEvent = getMessage(false, false);
+                rawSyslogEvent = getMessage(true, false, session);
 
                 if (rawSyslogEvent == null) {
                     break;
@@ -461,7 +478,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 break;
             }
 
-            session.adjustCounter("Messages Received", 1L, false);
             flowFilePerSender.put(sender, flowFile);
         }
 
@@ -483,6 +499,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
             getLogger().debug("Transferring {} to success", new Object[] 
{flowFile});
             session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, 
false);
+
             final String senderHost = sender.startsWith("/") && 
sender.length() > 1 ? sender.substring(1) : sender;
             final String transitUri = new 
StringBuilder().append(protocol.toLowerCase()).append("://").append(senderHost).append(":").append(port).toString();
             session.getProvenanceReporter().receive(flowFile, transitUri);

http://git-wip-us.apache.org/repos/asf/nifi/blob/19e53962/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 360dfe7..cd8621c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -602,11 +603,11 @@ public class TestListenSyslog {
         }
 
         @Override
-        protected RawSyslogEvent getMessage(final boolean longPoll, final 
boolean pollErrorQueue) {
+        protected RawSyslogEvent getMessage(final boolean longPoll, final 
boolean pollErrorQueue, final ProcessSession session) {
             if (eventItr.hasNext()) {
                 return eventItr.next();
             }
-            return super.getMessage(longPoll, pollErrorQueue);
+            return super.getMessage(longPoll, pollErrorQueue, session);
         }
     }
 }

Reply via email to