NIFI-1155: Ensure that when poll(FlowFileFilter, Set) is called, we properly 
update the indicator for whether or not queue is full

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3ed0949c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3ed0949c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3ed0949c

Branch: refs/heads/master
Commit: 3ed0949c5578f379cab7b90ff32778bf6296404a
Parents: e608642
Author: Mark Payne <[email protected]>
Authored: Thu Nov 12 07:48:57 2015 -0500
Committer: joewitt <[email protected]>
Committed: Fri Nov 13 00:06:12 2015 -0500

----------------------------------------------------------------------
 .../nifi/controller/StandardFlowFileQueue.java  |  41 ++++---
 .../controller/TestStandardFlowFileQueue.java   | 123 +++++++++++++++++++
 2 files changed, 150 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3ed0949c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index ae991c8..dd74250 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -80,19 +80,30 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     private static final Logger logger = 
LoggerFactory.getLogger(StandardFlowFileQueue.class);
 
     private PriorityQueue<FlowFileRecord> activeQueue = null;
+
+    // guarded by lock
     private ArrayList<FlowFileRecord> swapQueue = null;
 
     private final AtomicReference<FlowFileQueueSize> size = new 
AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L));
 
     private boolean swapMode = false;
+
+    // TODO: Need to create a single object that houses these 3 and then 
create an AtomicReference for it and use a CAS operation to set it.
     private volatile String maximumQueueDataSize;
     private volatile long maximumQueueByteCount;
     private volatile long maximumQueueObjectCount;
 
-    private final EventReporter eventReporter;
+    // TODO: Need to create a single object that houses these 2 and then 
create an AtomicReference for it and use CAS operation to set it.
     private final AtomicLong flowFileExpirationMillis;
-    private final Connection connection;
     private final AtomicReference<String> flowFileExpirationPeriod;
+
+    // TODO: Need to eliminate this all together. Since we are not locking on 
the size, can just get the size and compare to max
+    private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
+
+    // TODO: Unit test better!
+
+    private final EventReporter eventReporter;
+    private final Connection connection;
     private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
     private final List<FlowFilePrioritizer> priorities;
     private final int swapThreshold;
@@ -106,8 +117,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     private final ProvenanceEventRepository provRepository;
     private final ResourceClaimManager resourceClaimManager;
 
-    private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
-
     // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING 
SO WILL RESULT IN A DEADLOCK!
     private final ProcessScheduler scheduler;
 
@@ -683,13 +692,14 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     public List<FlowFileRecord> poll(final FlowFileFilter filter, final 
Set<FlowFileRecord> expiredRecords) {
         long bytesPulled = 0L;
         int flowFilesPulled = 0;
+        boolean queueFullAtStart = false;
 
         writeLock.lock();
         try {
             migrateSwapToActive();
 
             final long expirationMillis = this.flowFileExpirationMillis.get();
-            final boolean queueFullAtStart = queueFullRef.get();
+            queueFullAtStart = queueFullRef.get();
 
             final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
             final List<FlowFileRecord> unselected = new ArrayList<>();
@@ -735,17 +745,20 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
 
             this.activeQueue.addAll(unselected);
 
-            // if at least 1 FlowFile was expired & the queue was full before 
we started, then
-            // we need to determine whether or not the queue is full again. If 
no FlowFile was expired,
-            // then the queue will still be full until the appropriate 
#acknowledge method is called.
-            if (queueFullAtStart && !expiredRecords.isEmpty()) {
-                queueFullRef.set(determineIfFull());
-            }
-
             return selectedFlowFiles;
         } finally {
-            incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
-            writeLock.unlock("poll(Filter, Set)");
+            try {
+                incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
+
+                // if at least 1 FlowFile was expired & the queue was full 
before we started, then
+                // we need to determine whether or not the queue is full 
again. If no FlowFile was expired,
+                // then the queue will still be full until the appropriate 
#acknowledge method is called.
+                if (queueFullAtStart && !expiredRecords.isEmpty()) {
+                    queueFullRef.set(determineIfFull());
+                }
+            } finally {
+                writeLock.unlock("poll(Filter, Set)");
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/3ed0949c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 8b8c678..61f96fd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.controller;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -48,6 +49,7 @@ import 
org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.junit.Before;
@@ -107,6 +109,127 @@ public class TestStandardFlowFileQueue {
     }
 
     @Test
+    public void testBackPressure() {
+        queue.setBackPressureObjectThreshold(10);
+
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+        assertFalse(queue.isFull());
+
+        for (int i = 0; i < 9; i++) {
+            queue.put(new TestFlowFile());
+            assertFalse(queue.isFull());
+            assertFalse(queue.isEmpty());
+            assertFalse(queue.isActiveQueueEmpty());
+        }
+
+        queue.put(new TestFlowFile());
+        assertTrue(queue.isFull());
+        assertFalse(queue.isEmpty());
+        assertFalse(queue.isActiveQueueEmpty());
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        final FlowFileRecord polled = queue.poll(expiredRecords);
+        assertNotNull(polled);
+        assertTrue(expiredRecords.isEmpty());
+
+        assertFalse(queue.isEmpty());
+        assertFalse(queue.isActiveQueueEmpty());
+
+        // queue is still full because FlowFile has not yet been acknowledged.
+        assertTrue(queue.isFull());
+        queue.acknowledge(polled);
+
+        // FlowFile has been acknowledged; queue should no longer be full.
+        assertFalse(queue.isFull());
+        assertFalse(queue.isEmpty());
+        assertFalse(queue.isActiveQueueEmpty());
+    }
+
+    @Test
+    public void testBackPressureAfterPollFilter() throws InterruptedException {
+        queue.setBackPressureObjectThreshold(10);
+        queue.setFlowFileExpiration("10 millis");
+
+        for (int i = 0; i < 9; i++) {
+            queue.put(new TestFlowFile());
+            assertFalse(queue.isFull());
+        }
+
+        queue.put(new TestFlowFile());
+        assertTrue(queue.isFull());
+
+        Thread.sleep(100L);
+
+
+        final FlowFileFilter filter = new FlowFileFilter() {
+            @Override
+            public FlowFileFilterResult filter(final FlowFile flowFile) {
+                return FlowFileFilterResult.REJECT_AND_CONTINUE;
+            }
+        };
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        final List<FlowFileRecord> polled = queue.poll(filter, expiredRecords);
+        assertTrue(polled.isEmpty());
+        assertEquals(10, expiredRecords.size());
+
+        assertFalse(queue.isFull());
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+    }
+
+    @Test
+    public void testBackPressureAfterPollSingle() throws InterruptedException {
+        queue.setBackPressureObjectThreshold(10);
+        queue.setFlowFileExpiration("10 millis");
+
+        for (int i = 0; i < 9; i++) {
+            queue.put(new TestFlowFile());
+            assertFalse(queue.isFull());
+        }
+
+        queue.put(new TestFlowFile());
+        assertTrue(queue.isFull());
+
+        Thread.sleep(100L);
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        final FlowFileRecord polled = queue.poll(expiredRecords);
+        assertNull(polled);
+        assertEquals(10, expiredRecords.size());
+
+        assertFalse(queue.isFull());
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+    }
+
+    @Test
+    public void testBackPressureAfterPollMultiple() throws 
InterruptedException {
+        queue.setBackPressureObjectThreshold(10);
+        queue.setFlowFileExpiration("10 millis");
+
+        for (int i = 0; i < 9; i++) {
+            queue.put(new TestFlowFile());
+            assertFalse(queue.isFull());
+        }
+
+        queue.put(new TestFlowFile());
+        assertTrue(queue.isFull());
+
+        Thread.sleep(100L);
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        final List<FlowFileRecord> polled = queue.poll(10, expiredRecords);
+        assertTrue(polled.isEmpty());
+        assertEquals(10, expiredRecords.size());
+
+        assertFalse(queue.isFull());
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+    }
+
+    @Test
     public void testSwapOutOccurs() {
         for (int i = 0; i < 10000; i++) {
             queue.put(new TestFlowFile());

Reply via email to