Repository: nifi
Updated Branches:
  refs/heads/master 883c223ce -> 9324a2a74


NIFI-4476 Improving logic for determining when to yield in 
PutTCP/UDP/Syslog/Splunk

Signed-off-by: Pierre Villard <[email protected]>

This closes #2204.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9324a2a7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9324a2a7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9324a2a7

Branch: refs/heads/master
Commit: 9324a2a74200e96dce9574e1b0105e9ac4a46871
Parents: 883c223
Author: Bryan Bende <[email protected]>
Authored: Mon Oct 9 16:17:40 2017 -0400
Committer: Pierre Villard <[email protected]>
Committed: Tue Oct 10 09:02:02 2017 +0200

----------------------------------------------------------------------
 .../util/put/AbstractPutEventProcessor.java     | 36 ++++++++++++++++++-
 .../nifi/processors/splunk/PutSplunk.java       |  7 ++--
 .../nifi/processors/standard/PutSyslog.java     | 38 ++++++++++++++++++--
 .../apache/nifi/processors/standard/PutTCP.java |  7 ++--
 .../apache/nifi/processors/standard/PutUDP.java |  7 ++--
 5 files changed, 85 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9324a2a7/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index a246272..d09fe06 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -252,21 +252,28 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
      * Close any senders that haven't been active with in the given threshold
      *
      * @param idleThreshold the threshold to consider a sender as idle
+     * @return the number of connections that were closed as a result of being 
idle
      */
-    protected void pruneIdleSenders(final long idleThreshold) {
+    protected PruneResult pruneIdleSenders(final long idleThreshold) {
+        int numClosed = 0;
+        int numConsidered = 0;
+
         long currentTime = System.currentTimeMillis();
         final List<ChannelSender> putBack = new ArrayList<>();
 
         // if a connection hasn't been used with in the threshold then it gets 
closed
         ChannelSender sender;
         while ((sender = senderPool.poll()) != null) {
+            numConsidered++;
             if (currentTime > (sender.getLastUsed() + idleThreshold)) {
                 getLogger().debug("Closing idle connection...");
                 sender.close();
+                numClosed++;
             } else {
                 putBack.add(sender);
             }
         }
+
         // re-queue senders that weren't idle, but if the queue is full then 
close the sender
         for (ChannelSender putBackSender : putBack) {
             boolean returned = senderPool.offer(putBackSender);
@@ -274,6 +281,8 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
                 putBackSender.close();
             }
         }
+
+        return new PruneResult(numClosed, numConsidered);
     }
 
     /**
@@ -372,6 +381,31 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
     }
 
     /**
+     * The results from pruning connections.
+     */
+    protected static class PruneResult {
+
+        private final int numClosed;
+
+        private final int numConsidered;
+
+        public PruneResult(final int numClosed, final int numConsidered) {
+            this.numClosed = numClosed;
+            this.numConsidered = numConsidered;
+        }
+
+        public int getNumClosed() {
+            return numClosed;
+        }
+
+        public int getNumConsidered() {
+            return numConsidered;
+        }
+
+    }
+
+
+    /**
      * Represents a range of messages from a FlowFile.
      */
     protected static class Range {

http://git-wip-us.apache.org/repos/asf/nifi/blob/9324a2a7/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
index 57ea812..d674192 100644
--- 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
@@ -138,8 +138,11 @@ public class PutSplunk extends AbstractPutEventProcessor {
         final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
         if (flowFile == null) {
-            
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-            context.yield();
+            final PruneResult result = 
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
+            // yield if we closed an idle connection, or if there were no 
connections in the first place
+            if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && 
result.getNumConsidered() == 0)) {
+                context.yield();
+            }
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9324a2a7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index 412d8ab..63f17ba 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -265,20 +265,26 @@ public class PutSyslog extends AbstractSyslogProcessor {
         }
     }
 
-    private void pruneIdleSenders(final long idleThreshold){
+    private PruneResult pruneIdleSenders(final long idleThreshold){
+        int numClosed = 0;
+        int numConsidered = 0;
+
         long currentTime = System.currentTimeMillis();
         final List<ChannelSender> putBack = new ArrayList<>();
 
         // if a connection hasn't been used with in the threshold then it gets 
closed
         ChannelSender sender;
         while ((sender = senderPool.poll()) != null) {
+            numConsidered++;
             if (currentTime > (sender.getLastUsed() + idleThreshold)) {
                 getLogger().debug("Closing idle connection...");
                 sender.close();
+                numClosed++;
             } else {
                 putBack.add(sender);
             }
         }
+
         // re-queue senders that weren't idle, but if the queue is full then 
close the sender
         for (ChannelSender putBackSender : putBack) {
             boolean returned = senderPool.offer(putBackSender);
@@ -286,6 +292,8 @@ public class PutSyslog extends AbstractSyslogProcessor {
                 putBackSender.close();
             }
         }
+
+        return new PruneResult(numClosed, numConsidered);
     }
 
     @Override
@@ -295,8 +303,11 @@ public class PutSyslog extends AbstractSyslogProcessor {
 
         final List<FlowFile> flowFiles = session.get(batchSize);
         if (flowFiles == null || flowFiles.isEmpty()) {
-            
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-            context.yield();
+            final PruneResult result = 
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
+            // yield if we closed an idle connection, or if there were no 
connections in the first place
+            if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && 
result.getNumConsidered() == 0)) {
+                context.yield();
+            }
             return;
         }
 
@@ -394,4 +405,25 @@ public class PutSyslog extends AbstractSyslogProcessor {
         return false;
     }
 
+    private static class PruneResult {
+
+        private final int numClosed;
+
+        private final int numConsidered;
+
+        public PruneResult(final int numClosed, final int numConsidered) {
+            this.numClosed = numClosed;
+            this.numConsidered = numConsidered;
+        }
+
+        public int getNumClosed() {
+            return numClosed;
+        }
+
+        public int getNumConsidered() {
+            return numConsidered;
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9324a2a7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
index ee3e645..a8deab2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
@@ -168,8 +168,11 @@ public class PutTCP extends AbstractPutEventProcessor {
         final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
         if (flowFile == null) {
-            
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-            context.yield();
+            final PruneResult result = 
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
+            // yield if we closed an idle connection, or if there were no 
connections in the first place
+            if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && 
result.getNumConsidered() == 0)) {
+                context.yield();
+            }
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9324a2a7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
index af23c54..3157930 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
@@ -131,8 +131,11 @@ public class PutUDP extends AbstractPutEventProcessor {
         final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
         if (flowFile == null) {
-            
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-            context.yield();
+            final PruneResult result = 
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
+            // yield if we closed an idle connection, or if there were no 
connections in the first place
+            if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && 
result.getNumConsidered() == 0)) {
+                context.yield();
+            }
             return;
         }
 

Reply via email to