Repository: nifi Updated Branches: refs/heads/master 4ac52bfc0 -> 98f5a1ab7
NIFI-1110: Fixed bug that caused queue size to become negative when FlowFiles are expired Signed-off-by: Mark Payne <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/98f5a1ab Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/98f5a1ab Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/98f5a1ab Branch: refs/heads/master Commit: 98f5a1ab779eb3d49f95eda36844d1398af1a1f6 Parents: 4ac52bf Author: Mark Payne <[email protected]> Authored: Wed Nov 4 14:24:08 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Thu Nov 5 12:12:02 2015 -0500 ---------------------------------------------------------------------- .../nifi/controller/StandardFlowFileQueue.java | 41 ++++++++++++++++---- .../controller/TestStandardFlowFileQueue.java | 29 ++++++++++++++ 2 files changed, 63 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/98f5a1ab/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 3986ca8..0b3c661 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -395,7 +395,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { migrateSwapToActive(); final boolean queueFullAtStart = queueFullRef.get(); - int expiredRecordCount = 0; long expiredBytes = 0L; do { @@ -404,8 +403,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis)); if (isExpired) { expiredRecords.add(flowFile); - expiredRecordCount++; expiredBytes += flowFile.getSize(); + flowFile = null; if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { break; @@ -419,12 +418,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue { if (flowFile != null) { incrementActiveQueueSize(-1, -flowFile.getSize()); } - - if (expiredRecordCount > 0) { - incrementActiveQueueSize(-expiredRecordCount, -expiredBytes); - } } while (isExpired); + if (!expiredRecords.isEmpty()) { + incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes); + } + // if at least 1 FlowFile was expired & the queue was full before we started, then // we need to determine whether or not the queue is full again. If no FlowFile was expired, // then the queue will still be full until the appropriate #acknowledge method is called. @@ -432,7 +431,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { queueFullRef.set(determineIfFull()); } - return isExpired ? null : flowFile; + return flowFile; } @Override @@ -1198,6 +1197,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount + count, original.activeQueueBytes + bytes, original.swappedCount, original.swappedBytes, original.unacknowledgedCount, original.unacknowledgedBytes); updated = size.compareAndSet(original, newSize); + + if (updated) { + logIfNegative(original, newSize, "active"); + } } } @@ -1208,6 +1211,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes, original.swappedCount + count, original.swappedBytes + bytes, original.unacknowledgedCount, original.unacknowledgedBytes); updated = size.compareAndSet(original, newSize); + + if (updated) { + logIfNegative(original, newSize, "swap"); + } } } @@ -1218,6 +1225,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes, original.swappedCount, original.swappedBytes, original.unacknowledgedCount + count, original.unacknowledgedBytes + bytes); updated = size.compareAndSet(original, newSize); + + if (updated) { + logIfNegative(original, newSize, "Unacknowledged"); + } + } + } + + private void logIfNegative(final FlowFileQueueSize original, final FlowFileQueueSize newSize, final String counterName) { + if (newSize.activeQueueBytes < 0 || newSize.activeQueueCount < 0 || newSize.swappedBytes < 0 || newSize.swappedCount < 0 || + newSize.unacknowledgedBytes < 0 || newSize.unacknowledgedCount < 0) { + + logger.error("Updated Size of Queue " + counterName + " from " + original + " to " + newSize, new RuntimeException("Cannot create negative queue size")); + } } @@ -1259,5 +1279,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue { public QueueSize swapQueueSize() { return new QueueSize(swappedCount, swappedBytes); } + + @Override + public String toString() { + return "FlowFile Queue Size[ ActiveQueue=[" + activeQueueCount + ", " + activeQueueBytes + + " Bytes], Swap Queue=[" + swappedCount + ", " + swappedBytes + + " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]"; + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/98f5a1ab/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 7ef5fc8..8b8c678 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -76,6 +77,34 @@ public class TestStandardFlowFileQueue { TestFlowFile.idGenerator.set(0L); } + @Test + public void testExpire() { + queue.setFlowFileExpiration("1 ms"); + + for (int i = 0; i < 100; i++) { + queue.put(new TestFlowFile()); + } + + // just make sure that the flowfiles have time to expire. + try { + Thread.sleep(100L); + } catch (final InterruptedException ie) { + } + + final Set<FlowFileRecord> expiredRecords = new HashSet<>(100); + final FlowFileRecord pulled = queue.poll(expiredRecords); + + assertNull(pulled); + assertEquals(100, expiredRecords.size()); + + final QueueSize activeSize = queue.getActiveQueueSize(); + assertEquals(0, activeSize.getObjectCount()); + assertEquals(0L, activeSize.getByteCount()); + + final QueueSize unackSize = queue.getUnacknowledgedQueueSize(); + assertEquals(0, unackSize.getObjectCount()); + assertEquals(0L, unackSize.getByteCount()); + } @Test public void testSwapOutOccurs() {
