Repository: nifi Updated Branches: refs/heads/master 883c223ce -> 9324a2a74
NIFI-4476 Improving logic for determining when to yield in PutTCP/UDP/Syslog/Splunk Signed-off-by: Pierre Villard <[email protected]> This closes #2204. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9324a2a7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9324a2a7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9324a2a7 Branch: refs/heads/master Commit: 9324a2a74200e96dce9574e1b0105e9ac4a46871 Parents: 883c223 Author: Bryan Bende <[email protected]> Authored: Mon Oct 9 16:17:40 2017 -0400 Committer: Pierre Villard <[email protected]> Committed: Tue Oct 10 09:02:02 2017 +0200 ---------------------------------------------------------------------- .../util/put/AbstractPutEventProcessor.java | 36 ++++++++++++++++++- .../nifi/processors/splunk/PutSplunk.java | 7 ++-- .../nifi/processors/standard/PutSyslog.java | 38 ++++++++++++++++++-- .../apache/nifi/processors/standard/PutTCP.java | 7 ++-- .../apache/nifi/processors/standard/PutUDP.java | 7 ++-- 5 files changed, 85 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/9324a2a7/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java index a246272..d09fe06 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java @@ -252,21 +252,28 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr * Close any senders that haven't been active with in the given threshold * * @param idleThreshold the threshold to consider a sender as idle + * @return the number of connections that were closed as a result of being idle */ - protected void pruneIdleSenders(final long idleThreshold) { + protected PruneResult pruneIdleSenders(final long idleThreshold) { + int numClosed = 0; + int numConsidered = 0; + long currentTime = System.currentTimeMillis(); final List<ChannelSender> putBack = new ArrayList<>(); // if a connection hasn't been used with in the threshold then it gets closed ChannelSender sender; while ((sender = senderPool.poll()) != null) { + numConsidered++; if (currentTime > (sender.getLastUsed() + idleThreshold)) { getLogger().debug("Closing idle connection..."); sender.close(); + numClosed++; } else { putBack.add(sender); } } + // re-queue senders that weren't idle, but if the queue is full then close the sender for (ChannelSender putBackSender : putBack) { boolean returned = senderPool.offer(putBackSender); @@ -274,6 +281,8 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr putBackSender.close(); } } + + return new PruneResult(numClosed, numConsidered); } /** @@ -372,6 +381,31 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr } /** + * The results from pruning connections. + */ + protected static class PruneResult { + + private final int numClosed; + + private final int numConsidered; + + public PruneResult(final int numClosed, final int numConsidered) { + this.numClosed = numClosed; + this.numConsidered = numConsidered; + } + + public int getNumClosed() { + return numClosed; + } + + public int getNumConsidered() { + return numConsidered; + } + + } + + + /** * Represents a range of messages from a FlowFile. */ protected static class Range { http://git-wip-us.apache.org/repos/asf/nifi/blob/9324a2a7/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java index 57ea812..d674192 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java @@ -138,8 +138,11 @@ public class PutSplunk extends AbstractPutEventProcessor { final ProcessSession session = sessionFactory.createSession(); final FlowFile flowFile = session.get(); if (flowFile == null) { - pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - context.yield(); + final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + // yield if we closed an idle connection, or if there were no connections in the first place + if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) { + context.yield(); + } return; } http://git-wip-us.apache.org/repos/asf/nifi/blob/9324a2a7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java index 412d8ab..63f17ba 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java @@ -265,20 +265,26 @@ public class PutSyslog extends AbstractSyslogProcessor { } } - private void pruneIdleSenders(final long idleThreshold){ + private PruneResult pruneIdleSenders(final long idleThreshold){ + int numClosed = 0; + int numConsidered = 0; + long currentTime = System.currentTimeMillis(); final List<ChannelSender> putBack = new ArrayList<>(); // if a connection hasn't been used with in the threshold then it gets closed ChannelSender sender; while ((sender = senderPool.poll()) != null) { + numConsidered++; if (currentTime > (sender.getLastUsed() + idleThreshold)) { getLogger().debug("Closing idle connection..."); sender.close(); + numClosed++; } else { putBack.add(sender); } } + // re-queue senders that weren't idle, but if the queue is full then close the sender for (ChannelSender putBackSender : putBack) { boolean returned = senderPool.offer(putBackSender); @@ -286,6 +292,8 @@ public class PutSyslog extends AbstractSyslogProcessor { putBackSender.close(); } } + + return new PruneResult(numClosed, numConsidered); } @Override @@ -295,8 +303,11 @@ public class PutSyslog extends AbstractSyslogProcessor { final List<FlowFile> flowFiles = session.get(batchSize); if (flowFiles == null || flowFiles.isEmpty()) { - pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - context.yield(); + final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + // yield if we closed an idle connection, or if there were no connections in the first place + if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) { + context.yield(); + } return; } @@ -394,4 +405,25 @@ public class PutSyslog extends AbstractSyslogProcessor { return false; } + private static class PruneResult { + + private final int numClosed; + + private final int numConsidered; + + public PruneResult(final int numClosed, final int numConsidered) { + this.numClosed = numClosed; + this.numConsidered = numConsidered; + } + + public int getNumClosed() { + return numClosed; + } + + public int getNumConsidered() { + return numConsidered; + } + + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/9324a2a7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java index ee3e645..a8deab2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java @@ -168,8 +168,11 @@ public class PutTCP extends AbstractPutEventProcessor { final ProcessSession session = sessionFactory.createSession(); final FlowFile flowFile = session.get(); if (flowFile == null) { - pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - context.yield(); + final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + // yield if we closed an idle connection, or if there were no connections in the first place + if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) { + context.yield(); + } return; } http://git-wip-us.apache.org/repos/asf/nifi/blob/9324a2a7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java index af23c54..3157930 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java @@ -131,8 +131,11 @@ public class PutUDP extends AbstractPutEventProcessor { final ProcessSession session = sessionFactory.createSession(); final FlowFile flowFile = session.get(); if (flowFile == null) { - pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - context.yield(); + final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + // yield if we closed an idle connection, or if there were no connections in the first place + if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) { + context.yield(); + } return; }
