NIFI-730: Implemented swapping in and out on-demand by the FlowFileQueue rather 
than in a background thread


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/49a781df
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/49a781df
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/49a781df

Branch: refs/heads/master
Commit: 49a781df2d44859ec59672c2755b7346452cd74a
Parents: b8c51dc
Author: Mark Payne <[email protected]>
Authored: Mon Oct 12 13:27:07 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Tue Oct 13 10:03:03 2015 -0400

----------------------------------------------------------------------
 .../controller/queue/DropFlowFileState.java     |   3 +-
 .../controller/queue/DropFlowFileStatus.java    |   7 +-
 .../nifi/controller/queue/FlowFileQueue.java    |  50 --
 .../apache/nifi/controller/queue/QueueSize.java |   7 +
 .../repository/FlowFileSwapManager.java         |  17 +-
 .../SwapManagerInitializationContext.java       |   1 -
 .../java/org/apache/nifi/util/MockFlowFile.java |  24 +-
 .../org/apache/nifi/util/MockFlowFileQueue.java |   3 +-
 .../apache/nifi/util/MockProcessSession.java    |  15 +-
 .../nifi/util/StandardProcessorTestRunner.java  |  24 +-
 .../java/org/apache/nifi/util/TestRunner.java   | 124 ++--
 .../nifi/controller/DropFlowFileRequest.java    |  99 +++
 .../nifi/controller/StandardFlowFileQueue.java  | 687 +++++++++----------
 .../controller/TestStandardFlowFileQueue.java   | 330 +++++++++
 .../nifi/connectable/StandardConnection.java    |  33 +-
 .../nifi/controller/FileSystemSwapManager.java  |  54 +-
 .../apache/nifi/controller/FlowController.java  |  24 +-
 .../repository/StandardProcessSession.java      | 102 +--
 .../java/org/apache/nifi/util/Connectables.java |   2 +-
 .../controller/TestFileSystemSwapManager.java   | 142 +++-
 .../repository/TestStandardProcessSession.java  |   2 +-
 .../nifi/web/controller/ControllerFacade.java   |  30 +-
 22 files changed, 1155 insertions(+), 625 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
index 3f16d00..e412b80 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
@@ -25,7 +25,8 @@ public enum DropFlowFileState {
     WAITING_FOR_LOCK("Waiting for Destination Component to complete its 
action"),
     DROPPING_ACTIVE_FLOWFILES("Dropping FlowFiles from queue"),
     COMPLETE("Completed Successfully"),
-    FAILURE("Failed");
+    FAILURE("Failed"),
+    CANCELED("Cancelled by User");
     
     private final String description;
     

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
index b216608..3c3be9b 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -43,6 +43,12 @@ public interface DropFlowFileStatus {
     long getRequestSubmissionTime();
 
     /**
+     * @return the date/time (in milliseconds since epoch) at which the status 
of the
+     *         request was last updated
+     */
+    long getLastUpdated();
+
+    /**
      * @return the size of the queue when the drop request was issued or 
<code>null</code> if
      *         it is not yet known, which can happen if the {@link 
DropFlowFileState} is
      *         {@link DropFlowFileState#WAITING_FOR_LOCK}.
@@ -58,5 +64,4 @@ public interface DropFlowFileStatus {
      * @return the current state of the operation
      */
     DropFlowFileState getState();
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java 
b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index 31f17e0..bc2f358 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -60,13 +60,6 @@ public interface FlowFileQueue {
     void purgeSwapFiles();
 
     /**
-     * @return the minimum number of FlowFiles that must be present in order 
for
-     *         FlowFiles to begin being swapped out of the queue
-     */
-    // TODO: REMOVE THIS.
-    int getSwapThreshold();
-
-    /**
      * Resets the comparator used by this queue to maintain order.
      *
      * @param newPriorities the ordered list of prioritizers to use to 
determine
@@ -112,12 +105,8 @@ public interface FlowFileQueue {
      *         not include those FlowFiles that have been swapped out or are 
currently
      *         being processed
      */
-    // TODO: REMOVE?
     boolean isActiveQueueEmpty();
 
-    // TODO: REMOVE?
-    QueueSize getActiveQueueSize();
-
     /**
      * Returns a QueueSize that represents all FlowFiles that are 
'unacknowledged'. A FlowFile
      * is considered to be unacknowledged if it has been pulled from the queue 
by some component
@@ -152,45 +141,6 @@ public interface FlowFileQueue {
     void putAll(Collection<FlowFileRecord> files);
 
     /**
-     * Removes all records from the internal swap queue and returns them.
-     *
-     * @return all removed records from internal swap queue
-     */
-    // TODO: REMOVE THIS?
-    List<FlowFileRecord> pollSwappableRecords();
-
-    /**
-     * Restores the records from swap space into this queue, adding the records
-     * that have expired to the given set instead of enqueuing them.
-     *
-     * @param records that were swapped in
-     */
-    // TODO: REMOVE THIS?
-    void putSwappedRecords(Collection<FlowFileRecord> records);
-
-    /**
-     * Updates the internal counters of how much data is queued, based on
-     * swapped data that is being restored.
-     *
-     * @param numRecords count of records swapped in
-     * @param contentSize total size of records being swapped in
-     */
-    // TODO: REMOVE THIS?
-    void incrementSwapCount(int numRecords, long contentSize);
-
-    /**
-     * @return the number of FlowFiles that are enqueued and not swapped
-     */
-    // TODO: REMOVE THIS?
-    int unswappedSize();
-
-    // TODO: REMOVE THIS?
-    int getSwapRecordCount();
-
-    // TODO: REMOVE THIS?
-    int getSwapQueueSize();
-
-    /**
      * @param expiredRecords expired records
      * @return the next flow file on the queue; null if empty
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java 
b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
index 42d8416..528d652 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.controller.queue;
 
+import java.text.NumberFormat;
+
 /**
  *
  */
@@ -45,4 +47,9 @@ public class QueueSize {
     public long getByteCount() {
         return totalSizeBytes;
     }
+
+    @Override
+    public String toString() {
+        return "QueueSize[FlowFiles=" + objectCount + ", ContentSize=" + 
NumberFormat.getNumberInstance().format(totalSizeBytes) + " Bytes]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 57e9186..a70d287 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -26,6 +26,9 @@ import org.apache.nifi.controller.queue.QueueSize;
  * Defines a mechanism by which FlowFiles can be move into external storage or
  * memory so that they can be removed from the Java heap and vice-versa
  */
+// TODO: This needs to be refactored into two different mechanisms, one that 
is responsible for doing
+// framework-y types of things, such as updating the repositories, and another 
that is responsible
+// for serializing and deserializing FlowFiles to external storage.
 public interface FlowFileSwapManager {
 
     /**
@@ -38,6 +41,16 @@ public interface FlowFileSwapManager {
     void initialize(SwapManagerInitializationContext initializationContext);
 
     /**
+     * Drops all FlowFiles that are swapped out at the given location. This 
will update the Provenance
+     * Repository as well as the FlowFile Repository and
+     *
+     * @param swapLocation the location of the swap file to drop
+     * @param flowFileQueue the queue to which the FlowFiles belong
+     * @param user the user that initiated the request
+     */
+    void dropSwappedFlowFiles(String swapLocation, FlowFileQueue 
flowFileQueue, String user) throws IOException;
+
+    /**
      * Swaps out the given FlowFiles that belong to the queue with the given 
identifier.
      *
      * @param flowFiles the FlowFiles to swap out to external storage
@@ -53,13 +66,13 @@ public interface FlowFileSwapManager {
      * provides a view of the FlowFiles but does not actively swap them in, 
meaning that the swap file
      * at the given location remains in that location and the FlowFile 
Repository is not updated.
      *
-     * @param swapLocation the location of hte swap file
+     * @param swapLocation the location of the swap file
      * @param flowFileQueue the queue that the FlowFiles belong to
      * @return the FlowFiles that live at the given swap location
      *
      * @throws IOException if unable to recover the FlowFiles from the given 
location
      */
-    List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue 
flowFileQueue) throws IOException;
+    List<FlowFileRecord> peek(String swapLocation, FlowFileQueue 
flowFileQueue) throws IOException;
 
     /**
      * Recovers the FlowFiles from the swap file that lives at the given 
location and belongs

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
index 564d5ec..0e30784 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
@@ -27,7 +27,6 @@ public interface SwapManagerInitializationContext {
      */
     FlowFileRepository getFlowFileRepository();
 
-
     /**
      * @return the {@link ResourceClaimManager} that is necessary to provide 
to the FlowFileRepository when
      *         performing swapping actions

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index e9fb9d6..41bcc74 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -33,12 +33,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-
 import org.junit.Assert;
 
-public class MockFlowFile implements FlowFile {
+public class MockFlowFile implements FlowFileRecord {
 
     private final Map<String, String> attributes = new HashMap<>();
 
@@ -170,7 +171,7 @@ public class MockFlowFile implements FlowFile {
 
     public void assertAttributeNotExists(final String attributeName) {
         Assert.assertFalse("Attribute " + attributeName + " not exists with 
value " + attributes.get(attributeName),
-                attributes.containsKey(attributeName));
+            attributes.containsKey(attributeName));
     }
 
     public void assertAttributeEquals(final String attributeName, final String 
expectedValue) {
@@ -250,7 +251,7 @@ public class MockFlowFile implements FlowFile {
 
                 if ((fromStream & 0xFF) != (data[i] & 0xFF)) {
                     Assert.fail("FlowFile content differs from input at byte " 
+ bytesRead + " with input having value "
-                            + (fromStream & 0xFF) + " and FlowFile having 
value " + (data[i] & 0xFF));
+                        + (fromStream & 0xFF) + " and FlowFile having value " 
+ (data[i] & 0xFF));
                 }
 
                 bytesRead++;
@@ -274,4 +275,19 @@ public class MockFlowFile implements FlowFile {
     public Long getLastQueueDate() {
         return entryDate;
     }
+
+    @Override
+    public long getPenaltyExpirationMillis() {
+        return -1;
+    }
+
+    @Override
+    public ContentClaim getContentClaim() {
+        return null;
+    }
+
+    @Override
+    public long getContentClaimOffset() {
+        return 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java
index 775a1d5..0c6ec2a 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java
@@ -23,7 +23,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.nifi.processor.QueueSize;
+import org.apache.nifi.controller.queue.QueueSize;
+
 
 public class MockFlowFileQueue {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 1060854..85fc784 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -40,12 +40,12 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.FlowFileHandlingException;
@@ -691,7 +691,7 @@ public class MockProcessSession implements ProcessSession {
     /**
      * @param relationship to get flowfiles for
      * @return a List of FlowFiles in the order in which they were transferred
-     * to the given relationship
+     *         to the given relationship
      */
     public List<MockFlowFile> getFlowFilesForRelationship(final String 
relationship) {
         final Relationship procRel = new 
Relationship.Builder().name(relationship).build();
@@ -778,7 +778,7 @@ public class MockProcessSession implements ProcessSession {
      */
     private FlowFile inheritAttributes(final FlowFile source, final FlowFile 
destination) {
         if (source == null || destination == null || source == destination) {
-            return destination; //don't need to inherit from ourselves
+            return destination; // don't need to inherit from ourselves
         }
         final FlowFile updated = putAllAttributes(destination, 
source.getAttributes());
         getProvenanceReporter().fork(source, 
Collections.singletonList(updated));
@@ -801,7 +801,7 @@ public class MockProcessSession implements ProcessSession {
         int uuidsCaptured = 0;
         for (final FlowFile source : sources) {
             if (source == destination) {
-                continue; //don't want to capture parent uuid of this.  
Something can't be a child of itself
+                continue; // don't want to capture parent uuid of this. 
Something can't be a child of itself
             }
             final String sourceUuid = 
source.getAttribute(CoreAttributes.UUID.key());
             if (sourceUuid != null && !sourceUuid.trim().isEmpty()) {
@@ -832,7 +832,7 @@ public class MockProcessSession implements ProcessSession {
      */
     private static Map<String, String> intersectAttributes(final 
Collection<FlowFile> flowFileList) {
         final Map<String, String> result = new HashMap<>();
-        //trivial cases
+        // trivial cases
         if (flowFileList == null || flowFileList.isEmpty()) {
             return result;
         } else if (flowFileList.size() == 1) {
@@ -845,8 +845,7 @@ public class MockProcessSession implements ProcessSession {
          */
         final Map<String, String> firstMap = 
flowFileList.iterator().next().getAttributes();
 
-        outer:
-        for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
+        outer: for (final Map.Entry<String, String> mapEntry : 
firstMap.entrySet()) {
             final String key = mapEntry.getKey();
             final String value = mapEntry.getValue();
             for (final FlowFile flowFile : flowFileList) {
@@ -900,7 +899,7 @@ public class MockProcessSession implements ProcessSession {
     public void assertTransferCount(final Relationship relationship, final int 
count) {
         final int transferCount = 
getFlowFilesForRelationship(relationship).size();
         Assert.assertEquals("Expected " + count + " FlowFiles to be 
transferred to "
-                + relationship + " but actual transfer count was " + 
transferCount, count, transferCount);
+            + relationship + " but actual transfer count was " + 
transferCount, count, transferCount);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index eeeff61..0d00cc8 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -58,12 +58,12 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceReporter;
@@ -222,7 +222,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
             boolean unscheduledRun = false;
             for (final Future<Throwable> future : futures) {
                 try {
-                    final Throwable thrown = future.get();   // wait for the 
result
+                    final Throwable thrown = future.get(); // wait for the 
result
                     if (thrown != null) {
                         throw new AssertionError(thrown);
                     }
@@ -551,11 +551,11 @@ public class StandardProcessorTestRunner implements 
TestRunner {
     @Override
     public void addControllerService(final String identifier, final 
ControllerService service, final Map<String, String> properties) throws 
InitializationException {
         // hold off on failing due to deprecated annotation for now... will 
introduce later.
-//        for ( final Method method : service.getClass().getMethods() ) {
-//            if ( 
method.isAnnotationPresent(org.apache.nifi.controller.annotation.OnConfigured.class)
 ) {
-//                Assert.fail("Controller Service " + service + " is using 
deprecated Annotation " + 
org.apache.nifi.controller.annotation.OnConfigured.class + " for method " + 
method);
-//            }
-//        }
+        // for ( final Method method : service.getClass().getMethods() ) {
+        // if ( 
method.isAnnotationPresent(org.apache.nifi.controller.annotation.OnConfigured.class)
 ) {
+        // Assert.fail("Controller Service " + service + " is using deprecated 
Annotation " + org.apache.nifi.controller.annotation.OnConfigured.class + " for 
method " + method);
+        // }
+        // }
 
         final ComponentLog logger = new MockProcessorLog(identifier, service);
         final MockControllerServiceInitializationContext initContext = new 
MockControllerServiceInitializationContext(requireNonNull(service), 
requireNonNull(identifier), logger);
@@ -716,11 +716,11 @@ public class StandardProcessorTestRunner implements 
TestRunner {
         final PropertyDescriptor descriptor = 
service.getPropertyDescriptor(propertyName);
         if (descriptor == null) {
             return new ValidationResult.Builder()
-                    .input(propertyName)
-                    .explanation(propertyName + " is not a known Property for 
Controller Service " + service)
-                    .subject("Invalid property")
-                    .valid(false)
-                    .build();
+                .input(propertyName)
+                .explanation(propertyName + " is not a known Property for 
Controller Service " + service)
+                .subject("Invalid property")
+                .valid(false)
+                .build();
         }
         return setProperty(service, descriptor, value);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 6e66bfe..ec901fe 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -26,11 +26,11 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceReporter;
@@ -40,22 +40,22 @@ public interface TestRunner {
 
     /**
      * @return the {@link Processor} for which this <code>TestRunner</code> is
-     * configured
+     *         configured
      */
     Processor getProcessor();
 
     /**
      * @return the {@link ProcessSessionFactory} that this
-     * <code>TestRunner</code> will use to invoke the
-     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} 
method
+     *         <code>TestRunner</code> will use to invoke the
+     *         {@link Processor#onTrigger(ProcessContext, 
ProcessSessionFactory)} method
      */
     ProcessSessionFactory getProcessSessionFactory();
 
     /**
      * @return the {@Link ProcessContext} that this <code>TestRunner</code> 
will
-     * use to invoke the
-     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) 
onTrigger}
-     * method
+     *         use to invoke the
+     *         {@link Processor#onTrigger(ProcessContext, 
ProcessSessionFactory) onTrigger}
+     *         method
      */
     ProcessContext getProcessContext();
 
@@ -120,7 +120,7 @@ public interface TestRunner {
      *
      * @param iterations number of iterations
      * @param stopOnFinish whether or not to run the Processor methods that are
-     * annotated with {@link org.apache.nifi.processor.annotation.OnStopped 
@OnStopped}
+     *            annotated with {@link 
org.apache.nifi.processor.annotation.OnStopped @OnStopped}
      * @param initialize true if must initialize
      */
     void run(int iterations, boolean stopOnFinish, final boolean initialize);
@@ -163,10 +163,10 @@ public interface TestRunner {
      *
      * @param iterations number of iterations
      * @param stopOnFinish whether or not to run the Processor methods that are
-     * annotated with {@link org.apache.nifi.processor.annotation.OnStopped 
@OnStopped}
+     *            annotated with {@link 
org.apache.nifi.processor.annotation.OnStopped @OnStopped}
      * @param initialize true if must initialize
      * @param runWait indicates the amount of time in milliseconds that the 
framework should wait for
-     * processors to stop running before calling the {@link 
org.apache.nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation
+     *            processors to stop running before calling the {@link 
org.apache.nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation
      */
     void run(int iterations, boolean stopOnFinish, final boolean initialize, 
final long runWait);
 
@@ -187,8 +187,8 @@ public interface TestRunner {
 
     /**
      * @return the currently configured number of threads that will be used to
-     * runt he Processor when calling the {@link #run()} or {@link #run(int)}
-     * methods
+     *         runt he Processor when calling the {@link #run()} or {@link 
#run(int)}
+     *         methods
      */
     int getThreadCount();
 
@@ -296,7 +296,7 @@ public interface TestRunner {
 
     /**
      * @return <code>true</code> if the Input Queue to the Processor is empty,
-     * <code>false</code> otherwise
+     *         <code>false</code> otherwise
      */
     boolean isQueueEmpty();
 
@@ -421,7 +421,7 @@ public interface TestRunner {
 
     /**
      * @return the {@link ProvenanceReporter} that will be used by the
-     * configured {@link Processor} for reporting Provenance Events
+     *         configured {@link Processor} for reporting Provenance Events
      */
     ProvenanceReporter getProvenanceReporter();
 
@@ -433,7 +433,7 @@ public interface TestRunner {
     /**
      * @param name of counter
      * @return the current value of the counter with the specified name, or 
null
-     * if no counter exists with the specified name
+     *         if no counter exists with the specified name
      */
     Long getCounterValue(String name);
 
@@ -599,14 +599,14 @@ public interface TestRunner {
     /**
      * @param service the service
      * @return {@code true} if the given Controller Service is enabled,
-     * {@code false} if it is disabled
+     *         {@code false} if it is disabled
      *
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via 
the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, 
Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      */
     boolean isControllerServiceEnabled(ControllerService service);
 
@@ -622,11 +622,11 @@ public interface TestRunner {
      *
      * @throws IllegalStateException if the ControllerService is not disabled
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via 
the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, 
Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      *
      */
     void removeControllerService(ControllerService service);
@@ -641,11 +641,11 @@ public interface TestRunner {
      *
      * @throws IllegalStateException if the ControllerService is not disabled
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via 
the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, 
Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      *
      */
     ValidationResult setProperty(ControllerService service, PropertyDescriptor 
property, String value);
@@ -660,11 +660,11 @@ public interface TestRunner {
      *
      * @throws IllegalStateException if the ControllerService is not disabled
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via 
the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, 
Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      *
      */
     ValidationResult setProperty(ControllerService service, PropertyDescriptor 
property, AllowableValue value);
@@ -679,11 +679,11 @@ public interface TestRunner {
      *
      * @throws IllegalStateException if the ControllerService is not disabled
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via 
the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, 
Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      *
      */
     ValidationResult setProperty(ControllerService service, String 
propertyName, String value);
@@ -698,19 +698,19 @@ public interface TestRunner {
      * @throws IllegalStateException if the Controller Service is not disabled
      *
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via 
the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, 
Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      */
     void setAnnotationData(ControllerService service, String annotationData);
 
     /**
      * @param identifier of controller service
      * @return the {@link ControllerService} that is registered with the given
-     * identifier, or <code>null</code> if no Controller Service exists with 
the
-     * given identifier
+     *         identifier, or <code>null</code> if no Controller Service 
exists with the
+     *         given identifier
      */
     ControllerService getControllerService(String identifier);
 
@@ -720,11 +720,11 @@ public interface TestRunner {
      *
      * @param service the service to validate
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via 
the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, 
Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      */
     void assertValid(ControllerService service);
 
@@ -734,11 +734,11 @@ public interface TestRunner {
      *
      * @param service the service to validate
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via 
the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, 
Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      *
      */
     void assertNotValid(ControllerService service);
@@ -748,12 +748,12 @@ public interface TestRunner {
      * @param identifier identifier of service
      * @param serviceType type of service
      * @return the {@link ControllerService} that is registered with the given
-     * identifier, cast as the provided service type, or <code>null</code> if 
no
-     * Controller Service exists with the given identifier
+     *         identifier, cast as the provided service type, or 
<code>null</code> if no
+     *         Controller Service exists with the given identifier
      *
      * @throws ClassCastException if the identifier given is registered for a
-     * Controller Service but that Controller Service is not of the type
-     * specified
+     *             Controller Service but that Controller Service is not of 
the type
+     *             specified
      */
     <T extends ControllerService> T getControllerService(String identifier, 
Class<T> serviceType);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
new file mode 100644
index 0000000..609fe75
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.queue.QueueSize;
+
+public class DropFlowFileRequest implements DropFlowFileStatus {
+    private final String identifier;
+    private final long submissionTime = System.currentTimeMillis();
+
+    private volatile QueueSize originalSize;
+    private volatile QueueSize currentSize;
+    private volatile long lastUpdated = System.currentTimeMillis();
+    private volatile Thread executionThread;
+
+    private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
+
+
+    public DropFlowFileRequest(final String identifier) {
+        this.identifier = identifier;
+    }
+
+    @Override
+    public String getRequestIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public long getRequestSubmissionTime() {
+        return submissionTime;
+    }
+
+    @Override
+    public QueueSize getOriginalSize() {
+        return originalSize;
+    }
+
+    void setOriginalSize(final QueueSize originalSize) {
+        this.originalSize = originalSize;
+    }
+
+    @Override
+    public QueueSize getCurrentSize() {
+        return currentSize;
+    }
+
+    void setCurrentSize(final QueueSize queueSize) {
+        this.currentSize = currentSize;
+    }
+
+    @Override
+    public DropFlowFileState getState() {
+        return state;
+    }
+
+    @Override
+    public long getLastUpdated() {
+        return lastUpdated;
+    }
+
+    synchronized void setState(final DropFlowFileState state) {
+        this.state = state;
+        this.lastUpdated = System.currentTimeMillis();
+    }
+
+    void setExecutionThread(final Thread thread) {
+        this.executionThread = thread;
+    }
+
+    synchronized boolean cancel() {
+        if (this.state == DropFlowFileState.COMPLETE || this.state == 
DropFlowFileState.CANCELED) {
+            return false;
+        }
+
+        this.state = DropFlowFileState.CANCELED;
+        if (executionThread != null) {
+            executionThread.interrupt();
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index df356fd..073e5fb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -24,9 +25,13 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -35,25 +40,32 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.queue.DropFlowFileState;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.RepositoryRecordType;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.concurrency.TimedLock;
-import org.apache.nifi.util.timebuffer.LongEntityAccess;
-import org.apache.nifi.util.timebuffer.TimedBuffer;
-import org.apache.nifi.util.timebuffer.TimestampedLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,15 +79,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000;
     public static final int SWAP_RECORD_POLL_SIZE = 10000;
 
-    // When we have very high contention on a FlowFile Queue, the writeLock 
quickly becomes the bottleneck. In order to avoid this,
-    // we keep track of how often we are obtaining the write lock. If we 
exceed some threshold, we start performing a Pre-fetch so that
-    // we can then poll many times without having to obtain the lock.
-    // If lock obtained an average of more than PREFETCH_POLL_THRESHOLD times 
per second in order to poll from queue for last 5 seconds, do a pre-fetch.
-    public static final int PREFETCH_POLL_THRESHOLD = 1000;
-    public static final int PRIORITIZED_PREFETCH_SIZE = 10;
-    public static final int UNPRIORITIZED_PREFETCH_SIZE = 1000;
-    private volatile int prefetchSize = UNPRIORITIZED_PREFETCH_SIZE; // when 
we pre-fetch, how many should we pre-fetch?
-
     private static final Logger logger = 
LoggerFactory.getLogger(StandardFlowFileQueue.class);
 
     private PriorityQueue<FlowFileRecord> activeQueue = null;
@@ -97,9 +100,13 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     private final List<FlowFilePrioritizer> priorities;
     private final int swapThreshold;
     private final FlowFileSwapManager swapManager;
+    private final List<String> swapLocations = new ArrayList<>();
     private final TimedLock readLock;
     private final TimedLock writeLock;
     private final String identifier;
+    private final FlowFileRepository flowFileRepository;
+    private final ProvenanceEventRepository provRepository;
+    private final ResourceClaimManager resourceClaimManager;
 
     private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
     private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0);
@@ -108,8 +115,8 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING 
SO WILL RESULT IN A DEADLOCK!
     private final ProcessScheduler scheduler;
 
-    public StandardFlowFileQueue(final String identifier, final Connection 
connection, final ProcessScheduler scheduler, final FlowFileSwapManager 
swapManager, final EventReporter eventReporter,
-        final int swapThreshold) {
+    public StandardFlowFileQueue(final String identifier, final Connection 
connection, final FlowFileRepository flowFileRepo, final 
ProvenanceEventRepository provRepo,
+        final ResourceClaimManager resourceClaimManager, final 
ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final 
EventReporter eventReporter, final int swapThreshold) {
         activeQueue = new PriorityQueue<>(20, new Prioritizer(new 
ArrayList<FlowFilePrioritizer>()));
         priorities = new ArrayList<>();
         maximumQueueObjectCount = 0L;
@@ -120,6 +127,9 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         swapQueue = new ArrayList<>();
         this.eventReporter = eventReporter;
         this.swapManager = swapManager;
+        this.flowFileRepository = flowFileRepo;
+        this.provRepository = provRepo;
+        this.resourceClaimManager = resourceClaimManager;
 
         this.identifier = identifier;
         this.swapThreshold = swapThreshold;
@@ -141,11 +151,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     }
 
     @Override
-    public int getSwapThreshold() {
-        return swapThreshold;
-    }
-
-    @Override
     public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
         writeLock.lock();
         try {
@@ -154,12 +159,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
             activeQueue = newQueue;
             priorities.clear();
             priorities.addAll(newPriorities);
-
-            if (newPriorities.isEmpty()) {
-                prefetchSize = UNPRIORITIZED_PREFETCH_SIZE;
-            } else {
-                prefetchSize = PRIORITIZED_PREFETCH_SIZE;
-            }
         } finally {
             writeLock.unlock("setPriorities");
         }
@@ -225,33 +224,16 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
      */
     private QueueSize getQueueSize() {
         final QueueSize unacknowledged = unacknowledgedSizeRef.get();
-        final PreFetch preFetch = preFetchRef.get();
 
-        final int preFetchCount;
-        final long preFetchSize;
-        if (preFetch == null) {
-            preFetchCount = 0;
-            preFetchSize = 0L;
-        } else {
-            final QueueSize preFetchQueueSize = preFetch.size();
-            preFetchCount = preFetchQueueSize.getObjectCount();
-            preFetchSize = preFetchQueueSize.getByteCount();
-        }
-
-        return new QueueSize(activeQueue.size() + swappedRecordCount + 
unacknowledged.getObjectCount() + preFetchCount,
-            activeQueueContentSize + swappedContentSize + 
unacknowledged.getByteCount() + preFetchSize);
+        return new QueueSize(activeQueue.size() + swappedRecordCount + 
unacknowledged.getObjectCount(),
+            activeQueueContentSize + swappedContentSize + 
unacknowledged.getByteCount());
     }
 
     @Override
     public boolean isEmpty() {
         readLock.lock();
         try {
-            final PreFetch prefetch = preFetchRef.get();
-            if (prefetch == null) {
-                return activeQueue.isEmpty() && swappedRecordCount == 0 && 
unacknowledgedSizeRef.get().getObjectCount() == 0;
-            } else {
-                return activeQueue.isEmpty() && swappedRecordCount == 0 && 
unacknowledgedSizeRef.get().getObjectCount() == 0 && 
prefetch.size().getObjectCount() == 0;
-            }
+            return activeQueue.isEmpty() && swappedRecordCount == 0 && 
unacknowledgedSizeRef.get().getObjectCount() == 0;
         } finally {
             readLock.unlock("isEmpty");
         }
@@ -260,30 +242,13 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     @Override
     public boolean isActiveQueueEmpty() {
         final int activeQueueSize = activeQueueSizeRef.get();
-        if (activeQueueSize == 0) {
-            final PreFetch preFetch = preFetchRef.get();
-            if (preFetch == null) {
-                return true;
-            }
-
-            final QueueSize queueSize = preFetch.size();
-            return queueSize.getObjectCount() == 0;
-        } else {
-            return false;
-        }
+        return activeQueueSize == 0;
     }
 
-    @Override
     public QueueSize getActiveQueueSize() {
         readLock.lock();
         try {
-            final PreFetch preFetch = preFetchRef.get();
-            if (preFetch == null) {
-                return new QueueSize(activeQueue.size(), 
activeQueueContentSize);
-            } else {
-                final QueueSize preFetchSize = preFetch.size();
-                return new QueueSize(activeQueue.size() + 
preFetchSize.getObjectCount(), activeQueueContentSize + 
preFetchSize.getByteCount());
-            }
+            return new QueueSize(activeQueue.size(), activeQueueContentSize);
         } finally {
             readLock.unlock("getActiveQueueSize");
         }
@@ -374,6 +339,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                 swappedContentSize += file.getSize();
                 swappedRecordCount++;
                 swapMode = true;
+                writeSwapFilesIfNecessary();
             } else {
                 activeQueueContentSize += file.getSize();
                 activeQueue.add(file);
@@ -405,6 +371,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                 swappedContentSize += bytes;
                 swappedRecordCount += numFiles;
                 swapMode = true;
+                writeSwapFilesIfNecessary();
             } else {
                 activeQueueContentSize += bytes;
                 activeQueue.addAll(files);
@@ -421,116 +388,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         }
     }
 
-    @Override
-    public List<FlowFileRecord> pollSwappableRecords() {
-        writeLock.lock();
-        try {
-            if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
-                return null;
-            }
-
-            final List<FlowFileRecord> swapRecords = new 
ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size()));
-            final Iterator<FlowFileRecord> itr = swapQueue.iterator();
-            while (itr.hasNext() && swapRecords.size() < 
SWAP_RECORD_POLL_SIZE) {
-                final FlowFileRecord record = itr.next();
-                swapRecords.add(record);
-                itr.remove();
-            }
-
-            swapQueue.trimToSize();
-            return swapRecords;
-        } finally {
-            writeLock.unlock("pollSwappableRecords");
-        }
-    }
-
-    @Override
-    public void putSwappedRecords(final Collection<FlowFileRecord> records) {
-        writeLock.lock();
-        try {
-            try {
-                for (final FlowFileRecord record : records) {
-                    swappedContentSize -= record.getSize();
-                    swappedRecordCount--;
-                    activeQueueContentSize += record.getSize();
-                    activeQueue.add(record);
-                }
-
-                if (swappedRecordCount > swapQueue.size()) {
-                    // we have more swap files to be swapped in.
-                    return;
-                }
-
-                // If a call to #pollSwappableRecords will not produce any, go 
ahead and roll those FlowFiles back into the mix
-                if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
-                    for (final FlowFileRecord record : swapQueue) {
-                        activeQueue.add(record);
-                        activeQueueContentSize += record.getSize();
-                    }
-                    swapQueue.clear();
-                    swappedContentSize = 0L;
-                    swappedRecordCount = 0;
-                    swapMode = false;
-                }
-            } finally {
-                activeQueueSizeRef.set(activeQueue.size());
-            }
-        } finally {
-            writeLock.unlock("putSwappedRecords");
-            scheduler.registerEvent(connection.getDestination());
-        }
-    }
-
-    @Override
-    public void incrementSwapCount(final int numRecords, final long 
contentSize) {
-        writeLock.lock();
-        try {
-            swappedContentSize += contentSize;
-            swappedRecordCount += numRecords;
-        } finally {
-            writeLock.unlock("incrementSwapCount");
-        }
-    }
-
-    @Override
-    public int unswappedSize() {
-        readLock.lock();
-        try {
-            return activeQueue.size() + 
unacknowledgedSizeRef.get().getObjectCount();
-        } finally {
-            readLock.unlock("unswappedSize");
-        }
-    }
-
-    @Override
-    public int getSwapRecordCount() {
-        readLock.lock();
-        try {
-            return swappedRecordCount;
-        } finally {
-            readLock.unlock("getSwapRecordCount");
-        }
-    }
-
-    @Override
-    public int getSwapQueueSize() {
-        readLock.lock();
-        try {
-            if (logger.isDebugEnabled()) {
-                final long byteToMbDivisor = 1024L * 1024L;
-                final QueueSize unacknowledged = unacknowledgedSizeRef.get();
-
-                logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap 
Queue={}/{} MB, Unacknowledged={}/{} MB",
-                    activeQueue.size(), activeQueueContentSize / 
byteToMbDivisor,
-                    swappedRecordCount, swappedContentSize / byteToMbDivisor,
-                    unacknowledged.getObjectCount(), 
unacknowledged.getByteCount() / byteToMbDivisor);
-            }
-
-            return swapQueue.size();
-        } finally {
-            readLock.unlock("getSwapQueueSize");
-        }
-    }
 
     private boolean isLaterThan(final Long maxAge) {
         if (maxAge == null) {
@@ -558,30 +415,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
 
         // First check if we have any records Pre-Fetched.
         final long expirationMillis = flowFileExpirationMillis.get();
-        final PreFetch preFetch = preFetchRef.get();
-        if (preFetch != null) {
-            if (preFetch.isExpired()) {
-                requeueExpiredPrefetch(preFetch);
-            } else {
-                while (true) {
-                    final FlowFileRecord next = preFetch.nextRecord();
-                    if (next == null) {
-                        break;
-                    }
-
-                    if (isLaterThan(getExpirationDate(next, 
expirationMillis))) {
-                        expiredRecords.add(next);
-                        continue;
-                    }
-
-                    updateUnacknowledgedSize(1, next.getSize());
-                    return next;
-                }
-
-                preFetchRef.compareAndSet(preFetch, null);
-            }
-        }
-
         writeLock.lock();
         try {
             flowFile = doPoll(expiredRecords, expirationMillis);
@@ -631,9 +464,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
             queueFullRef.set(determineIfFull());
         }
 
-        if (incrementPollCount()) {
-            prefetch();
-        }
         return isExpired ? null : flowFile;
     }
 
@@ -642,38 +472,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         final List<FlowFileRecord> records = new ArrayList<>(Math.min(1024, 
maxResults));
 
         // First check if we have any records Pre-Fetched.
-        final long expirationMillis = flowFileExpirationMillis.get();
-        final PreFetch preFetch = preFetchRef.get();
-        if (preFetch != null) {
-            if (preFetch.isExpired()) {
-                requeueExpiredPrefetch(preFetch);
-            } else {
-                long totalSize = 0L;
-                for (int i = 0; i < maxResults; i++) {
-                    final FlowFileRecord next = preFetch.nextRecord();
-                    if (next == null) {
-                        break;
-                    }
-
-                    if (isLaterThan(getExpirationDate(next, 
expirationMillis))) {
-                        expiredRecords.add(next);
-                        continue;
-                    }
-
-                    records.add(next);
-                    totalSize += next.getSize();
-                }
-
-                // If anything was prefetched, use what we have.
-                if (!records.isEmpty()) {
-                    updateUnacknowledgedSize(records.size(), totalSize);
-                    return records;
-                }
-
-                preFetchRef.compareAndSet(preFetch, null);
-            }
-        }
-
         writeLock.lock();
         try {
             doPoll(records, maxResults, expiredRecords);
@@ -705,10 +503,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         if (queueFullAtStart && !expiredRecords.isEmpty()) {
             queueFullRef.set(determineIfFull());
         }
-
-        if (incrementPollCount()) {
-            prefetch();
-        }
     }
 
     /**
@@ -732,6 +526,46 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         // Swap Queue to the Active Queue. However, we don't do this if there 
are FlowFiles already swapped out
         // to disk, because we want them to be swapped back in in the same 
order that they were swapped out.
 
+        if (activeQueue.size() > swapThreshold - SWAP_RECORD_POLL_SIZE) {
+            return;
+        }
+
+        // If there are swap files waiting to be swapped in, swap those in 
first. We do this in order to ensure that those that
+        // were swapped out first are then swapped back in first. If we 
instead just immediately migrated the FlowFiles from the
+        // swap queue to the active queue, and we never run out of FlowFiles 
in the active queue (because destination cannot
+        // keep up with queue), we will end up always processing the new 
FlowFiles first instead of the FlowFiles that arrived
+        // first.
+        if (!swapLocations.isEmpty()) {
+            final String swapLocation = swapLocations.remove(0);
+            try {
+                final List<FlowFileRecord> swappedIn = 
swapManager.swapIn(swapLocation, this);
+                swappedRecordCount -= swappedIn.size();
+                long swapSize = 0L;
+                for (final FlowFileRecord flowFile : swappedIn) {
+                    swapSize += flowFile.getSize();
+                }
+                swappedContentSize -= swapSize;
+                activeQueueContentSize += swapSize;
+                activeQueueSizeRef.set(activeQueue.size());
+                activeQueue.addAll(swappedIn);
+                return;
+            } catch (final FileNotFoundException fnfe) {
+                logger.error("Failed to swap in FlowFiles from Swap File {} 
because the Swap File can no longer be found", swapLocation);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "Swap File", 
"Failed to swap in FlowFiles from Swap File " + swapLocation + " because the 
Swap File can no longer be found");
+                }
+                return;
+            } catch (final IOException ioe) {
+                logger.error("Failed to swap in FlowFiles from Swap File {}; 
Swap File appears to be corrupt!", swapLocation);
+                logger.error("", ioe);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "Swap File", 
"Failed to swap in FlowFiles from Swap File " +
+                        swapLocation + "; Swap File appears to be corrupt! 
Some FlowFiles in the queue may not be accessible. See logs for more 
information.");
+                }
+                return;
+            }
+        }
+
         // this is the most common condition (nothing is swapped out), so do 
the check first and avoid the expense
         // of other checks for 99.999% of the cases.
         if (swappedRecordCount == 0 && swapQueue.isEmpty()) {
@@ -760,6 +594,69 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         }
     }
 
+    /**
+     * This method MUST be called with the write lock held
+     */
+    private void writeSwapFilesIfNecessary() {
+        if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
+            return;
+        }
+
+        final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE;
+
+        // Create a new Priority queue with the prioritizers that are set, but 
reverse the
+        // prioritizers because we want to pull the lowest-priority FlowFiles 
to swap out
+        final PriorityQueue<FlowFileRecord> tempQueue = new 
PriorityQueue<>(activeQueue.size() + swapQueue.size(), 
Collections.reverseOrder(new Prioritizer(priorities)));
+        tempQueue.addAll(activeQueue);
+        tempQueue.addAll(swapQueue);
+
+        final List<String> swapLocations = new ArrayList<>(numSwapFiles);
+        for (int i = 0; i < numSwapFiles; i++) {
+            // Create a new swap file for the next SWAP_RECORD_POLL_SIZE 
records
+            final List<FlowFileRecord> toSwap = new 
ArrayList<>(SWAP_RECORD_POLL_SIZE);
+            for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) {
+                toSwap.add(tempQueue.poll());
+            }
+
+            try {
+                Collections.reverse(toSwap); // currently ordered in reverse 
priority order based on the ordering of the temp queue.
+                final String swapLocation = swapManager.swapOut(toSwap, this);
+                swapLocations.add(swapLocation);
+            } catch (final IOException ioe) {
+                tempQueue.addAll(toSwap); // if we failed, we must add the 
FlowFiles back to the queue.
+                logger.error("FlowFile Queue with identifier {} has {} 
FlowFiles queued up. Attempted to spill FlowFile information over to disk in 
order to avoid exhausting "
+                    + "the Java heap space but failed to write information to 
disk due to {}", getIdentifier(), getQueueSize().getObjectCount(), 
ioe.toString());
+                logger.error("", ioe);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "Failed to 
Overflow to Disk", "Flowfile Queue with identifier " + getIdentifier() + " has 
" + getQueueSize().getObjectCount() +
+                        " queued up. Attempted to spill FlowFile information 
over to disk in order to avoid exhausting the Java heap space but failed to 
write information to disk. "
+                        + "See logs for more information.");
+                }
+
+                break;
+            }
+        }
+
+        // Pull any records off of the temp queue that won't fit back on the 
active queue, and add those to the
+        // swap queue. Then add the records back to the active queue.
+        swapQueue.clear();
+        while (tempQueue.size() > swapThreshold) {
+            final FlowFileRecord record = tempQueue.poll();
+            swapQueue.add(record);
+        }
+
+        Collections.reverse(swapQueue); // currently ordered in reverse 
priority order based on the ordering of the temp queue
+
+        // replace the contents of the active queue, since we've merged it 
with the swap queue.
+        activeQueue.clear();
+        FlowFileRecord toRequeue;
+        while ((toRequeue = tempQueue.poll()) != null) {
+            activeQueue.offer(toRequeue);
+        }
+        this.swapLocations.addAll(swapLocations);
+    }
+
+
     @Override
     public long drainQueue(final Queue<FlowFileRecord> sourceQueue, final 
List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> 
expiredRecords) {
         long drainedSize = 0L;
@@ -796,13 +693,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
             final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
             final List<FlowFileRecord> unselected = new ArrayList<>();
 
-            // the prefetch doesn't allow us to add records back. So when this 
method is used,
-            // if there are prefetched records, we have to requeue them into 
the active queue first.
-            final PreFetch prefetch = preFetchRef.get();
-            if (prefetch != null) {
-                requeueExpiredPrefetch(prefetch);
-            }
-
             while (true) {
                 FlowFileRecord flowFile = this.activeQueue.poll();
                 if (flowFile == null) {
@@ -856,6 +746,8 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         }
     }
 
+
+
     private static final class Prioritizer implements 
Comparator<FlowFileRecord>, Serializable {
 
         private static final long serialVersionUID = 1L;
@@ -991,6 +883,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
 
             this.swappedRecordCount = swapFlowFileCount;
             this.swappedContentSize = swapByteCount;
+            this.swapLocations.addAll(swapLocations);
         } finally {
             writeLock.unlock("Recover Swap Files");
         }
@@ -1004,173 +897,219 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         return "FlowFileQueue[id=" + identifier + "]";
     }
 
-    @Override
-    public DropFlowFileStatus dropFlowFiles() {
-        // TODO Auto-generated method stub
-        return null;
-    }
+    private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = 
new ConcurrentHashMap<>();
 
     @Override
-    public boolean cancelDropFlowFileRequest(String requestIdentifier) {
-        // TODO Auto-generated method stub
-        return false;
-    }
+    public DropFlowFileStatus dropFlowFiles() {
+        // purge any old requests from the map just to keep it clean. But if 
there are very requests, which is usually the case, then don't bother
+        if (dropRequestMap.size() > 10) {
+            final List<String> toDrop = new ArrayList<>();
+            for (final Map.Entry<String, DropFlowFileRequest> entry : 
dropRequestMap.entrySet()) {
+                final DropFlowFileRequest request = entry.getValue();
+                final boolean completed = request.getState() == 
DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE;
+
+                if (completed && System.currentTimeMillis() - 
request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+                    toDrop.add(entry.getKey());
+                }
+            }
 
-    @Override
-    public DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier) {
-        // TODO Auto-generated method stub
-        return null;
-    }
+            for (final String requestId : toDrop) {
+                dropRequestMap.remove(requestId);
+            }
+        }
 
-    /**
-     * Lock the queue so that other threads are unable to interact with the
-     * queue
-     */
-    public void lock() {
-        writeLock.lock();
-    }
+        // TODO: get user name!
+        final String userName = null;
 
-    /**
-     * Unlock the queue
-     */
-    public void unlock() {
-        writeLock.unlock("external unlock");
-    }
+        final String requestIdentifier = UUID.randomUUID().toString();
+        final DropFlowFileRequest dropRequest = new 
DropFlowFileRequest(requestIdentifier);
+        final Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                writeLock.lock();
+                try {
+                    
dropRequest.setState(DropFlowFileState.DROPPING_ACTIVE_FLOWFILES);
+
+                    try {
+                        final List<FlowFileRecord> activeQueueRecords = new 
ArrayList<>(activeQueue);
+                        drop(activeQueueRecords, userName);
+                        activeQueue.clear();
+                        dropRequest.setCurrentSize(getQueueSize());
+
+                        drop(swapQueue, userName);
+                        swapQueue.clear();
+                        dropRequest.setCurrentSize(getQueueSize());
+
+                        final Iterator<String> swapLocationItr = 
swapLocations.iterator();
+                        while (swapLocationItr.hasNext()) {
+                            final String swapLocation = swapLocationItr.next();
+                            final List<FlowFileRecord> swappedIn = 
swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
+                            try {
+                                drop(swappedIn, userName);
+                            } catch (final Exception e) {
+                                activeQueue.addAll(swappedIn); // ensure that 
we don't lose the FlowFiles from our queue.
+                                throw e;
+                            }
+
+                            dropRequest.setCurrentSize(getQueueSize());
+                            swapLocationItr.remove();
+                        }
 
-    @Override
-    public QueueSize getUnacknowledgedQueueSize() {
-        return unacknowledgedSizeRef.get();
-    }
+                        dropRequest.setState(DropFlowFileState.COMPLETE);
+                    } catch (final Exception e) {
+                        // TODO: Handle adequately
+                        dropRequest.setState(DropFlowFileState.FAILURE);
+                    }
+                } finally {
+                    writeLock.unlock("Drop FlowFiles");
+                }
+            }
+        }, "Drop FlowFiles for Connection " + getIdentifier());
+        t.setDaemon(true);
+        t.start();
 
-    private void updateUnacknowledgedSize(final int addToCount, final long 
addToSize) {
-        boolean updated = false;
+        dropRequest.setExecutionThread(t);
+        dropRequestMap.put(requestIdentifier, dropRequest);
 
-        do {
-            final QueueSize queueSize = unacknowledgedSizeRef.get();
-            final QueueSize newSize = new QueueSize(queueSize.getObjectCount() 
+ addToCount, queueSize.getByteCount() + addToSize);
-            updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize);
-        } while (!updated);
+        return dropRequest;
     }
 
-    private void requeueExpiredPrefetch(final PreFetch prefetch) {
-        if (prefetch == null) {
-            return;
+    private void drop(final List<FlowFileRecord> flowFiles, final String user) 
throws IOException {
+        // Create a Provenance Event and a FlowFile Repository record for each 
FlowFile
+        final List<ProvenanceEventRecord> provenanceEvents = new 
ArrayList<>(flowFiles.size());
+        final List<RepositoryRecord> flowFileRepoRecords = new 
ArrayList<>(flowFiles.size());
+        for (final FlowFileRecord flowFile : flowFiles) {
+            provenanceEvents.add(createDropEvent(flowFile, user));
+            flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
         }
 
-        writeLock.lock();
-        try {
-            final long contentSizeRequeued = prefetch.requeue(activeQueue);
-            this.activeQueueContentSize += contentSizeRequeued;
-            this.preFetchRef.compareAndSet(prefetch, null);
-        } finally {
-            writeLock.unlock("requeueExpiredPrefetch");
-        }
-    }
+        for (final FlowFileRecord flowFile : flowFiles) {
+            final ContentClaim contentClaim = flowFile.getContentClaim();
+            if (contentClaim == null) {
+                continue;
+            }
 
-    /**
-     * MUST be called with write lock held.
-     */
-    private final AtomicReference<PreFetch> preFetchRef = new 
AtomicReference<>();
+            final ResourceClaim resourceClaim = 
contentClaim.getResourceClaim();
+            if (resourceClaim == null) {
+                continue;
+            }
 
-    private void prefetch() {
-        if (activeQueue.isEmpty()) {
-            return;
+            resourceClaimManager.decrementClaimantCount(resourceClaim);
         }
 
-        final int numToFetch = Math.min(prefetchSize, activeQueue.size());
+        provRepository.registerEvents(provenanceEvents);
+        flowFileRepository.updateRepository(flowFileRepoRecords);
+    }
 
-        final PreFetch curPreFetch = preFetchRef.get();
-        if (curPreFetch != null && curPreFetch.size().getObjectCount() > 0) {
-            return;
-        }
+    private ProvenanceEventRecord createDropEvent(final FlowFileRecord 
flowFile, final String user) {
+        final ProvenanceEventBuilder builder = provRepository.eventBuilder();
+        builder.fromFlowFile(flowFile);
+        builder.setEventType(ProvenanceEventType.DROP);
+        builder.setLineageStartDate(flowFile.getLineageStartDate());
+        builder.setComponentId(getIdentifier());
+        builder.setComponentType("Connection");
+        builder.setDetails("FlowFile manually dropped by user " + user);
+        return builder.build();
+    }
 
-        final List<FlowFileRecord> buffer = new ArrayList<>(numToFetch);
-        long contentSize = 0L;
-        for (int i = 0; i < numToFetch; i++) {
-            final FlowFileRecord record = activeQueue.poll();
-            if (record == null || record.isPenalized()) {
-                // not enough unpenalized records to pull. Put all records 
back and return
-                activeQueue.addAll(buffer);
-                if (record != null) {
-                    activeQueue.add(record);
-                }
-                return;
-            } else {
-                buffer.add(record);
-                contentSize += record.getSize();
+    private RepositoryRecord createDeleteRepositoryRecord(final FlowFileRecord 
flowFile) {
+        return new RepositoryRecord() {
+            @Override
+            public FlowFileQueue getDestination() {
+                return null;
             }
-        }
 
-        activeQueueContentSize -= contentSize;
-        preFetchRef.set(new PreFetch(buffer));
-    }
+            @Override
+            public FlowFileQueue getOriginalQueue() {
+                return StandardFlowFileQueue.this;
+            }
 
-    private final TimedBuffer<TimestampedLong> pollCounts = new 
TimedBuffer<>(TimeUnit.SECONDS, 5, new LongEntityAccess());
+            @Override
+            public RepositoryRecordType getType() {
+                return RepositoryRecordType.DELETE;
+            }
 
-    private boolean incrementPollCount() {
-        pollCounts.add(new TimestampedLong(1L));
-        final long totalCount = 
pollCounts.getAggregateValue(System.currentTimeMillis() - 5000L).getValue();
-        return totalCount > PREFETCH_POLL_THRESHOLD * 5;
-    }
+            @Override
+            public ContentClaim getCurrentClaim() {
+                return flowFile.getContentClaim();
+            }
 
-    private static class PreFetch {
+            @Override
+            public ContentClaim getOriginalClaim() {
+                return flowFile.getContentClaim();
+            }
 
-        private final List<FlowFileRecord> records;
-        private final AtomicInteger pointer = new AtomicInteger(0);
-        private final long expirationTime = System.nanoTime() + 
TimeUnit.SECONDS.toNanos(1L);
-        private final AtomicLong contentSize = new AtomicLong(0L);
+            @Override
+            public long getCurrentClaimOffset() {
+                return flowFile.getContentClaimOffset();
+            }
 
-        public PreFetch(final List<FlowFileRecord> records) {
-            this.records = records;
+            @Override
+            public FlowFileRecord getCurrent() {
+                return flowFile;
+            }
 
-            long totalSize = 0L;
-            for (final FlowFileRecord record : records) {
-                totalSize += record.getSize();
+            @Override
+            public boolean isAttributesChanged() {
+                return false;
             }
-            contentSize.set(totalSize);
-        }
 
-        public FlowFileRecord nextRecord() {
-            final int nextValue = pointer.getAndIncrement();
-            if (nextValue >= records.size()) {
+            @Override
+            public boolean isMarkedForAbort() {
+                return false;
+            }
+
+            @Override
+            public String getSwapLocation() {
                 return null;
             }
+        };
+    }
 
-            final FlowFileRecord flowFile = records.get(nextValue);
-            contentSize.addAndGet(-flowFile.getSize());
-            return flowFile;
+
+    @Override
+    public boolean cancelDropFlowFileRequest(final String requestIdentifier) {
+        final DropFlowFileRequest request = 
dropRequestMap.remove(requestIdentifier);
+        if (request == null) {
+            return false;
         }
 
-        public QueueSize size() {
-            final int pointerIndex = pointer.get();
-            final int count = records.size() - pointerIndex;
-            if (count < 0) {
-                return new QueueSize(0, 0L);
-            }
+        final boolean successful = request.cancel();
+        return successful;
+    }
 
-            final long bytes = contentSize.get();
-            return new QueueSize(count, bytes);
-        }
+    @Override
+    public DropFlowFileStatus getDropFlowFileStatus(final String 
requestIdentifier) {
+        return dropRequestMap.get(requestIdentifier);
+    }
 
-        public boolean isExpired() {
-            return System.nanoTime() > expirationTime;
-        }
+    /**
+     * Lock the queue so that other threads are unable to interact with the
+     * queue
+     */
+    public void lock() {
+        writeLock.lock();
+    }
 
-        private long requeue(final Queue<FlowFileRecord> queue) {
-            // get the current pointer and prevent any other thread from 
accessing the rest of the elements
-            final int curPointer = pointer.getAndAdd(records.size());
-            if (curPointer < records.size() - 1) {
-                final List<FlowFileRecord> subList = 
records.subList(curPointer, records.size());
-                long contentSize = 0L;
-                for (final FlowFileRecord record : subList) {
-                    contentSize += record.getSize();
-                }
+    /**
+     * Unlock the queue
+     */
+    public void unlock() {
+        writeLock.unlock("external unlock");
+    }
 
-                queue.addAll(subList);
+    @Override
+    public QueueSize getUnacknowledgedQueueSize() {
+        return unacknowledgedSizeRef.get();
+    }
 
-                return contentSize;
-            }
-            return 0L;
-        }
+    private void updateUnacknowledgedSize(final int addToCount, final long 
addToSize) {
+        boolean updated = false;
+
+        do {
+            final QueueSize queueSize = unacknowledgedSizeRef.get();
+            final QueueSize newSize = new QueueSize(queueSize.getObjectCount() 
+ addToCount, queueSize.getByteCount() + addToSize);
+            updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize);
+        } while (!updated);
     }
 }

Reply via email to