http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index c6fbf28..64e49c3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -16,30 +16,20 @@
  */
 package org.apache.nifi.controller;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import org.apache.nifi.attribute.expression.language.PreparedQuery;
+import org.apache.nifi.attribute.expression.language.Query;
+import 
org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.queue.DropFlowFileState;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileSummary;
+import org.apache.nifi.controller.queue.ListFlowFileRequest;
+import org.apache.nifi.controller.queue.ListFlowFileState;
+import org.apache.nifi.controller.queue.ListFlowFileStatus;
 import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.SortColumn;
+import org.apache.nifi.controller.queue.SortDirection;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
@@ -49,8 +39,10 @@ import 
org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
@@ -65,6 +57,27 @@ import org.apache.nifi.util.concurrency.TimedLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * A FlowFileQueue is used to queue FlowFile objects that are awaiting further
  * processing. Must be thread safe.
@@ -82,7 +95,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     // guarded by lock
     private ArrayList<FlowFileRecord> swapQueue = null;
 
-    private final AtomicReference<FlowFileQueueSize> size = new 
AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L));
+    private final AtomicReference<FlowFileQueueSize> size = new 
AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0, 0L));
 
     private boolean swapMode = false;
 
@@ -96,7 +109,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     private final int swapThreshold;
     private final FlowFileSwapManager swapManager;
     private final List<String> swapLocations = new ArrayList<>();
-    @SuppressWarnings("unused")
     private final TimedLock readLock;
     private final TimedLock writeLock;
     private final String identifier;
@@ -104,6 +116,9 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     private final ProvenanceEventRepository provRepository;
     private final ResourceClaimManager resourceClaimManager;
 
+    private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = 
new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = 
new ConcurrentHashMap<>();
+
     // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING 
SO WILL RESULT IN A DEADLOCK!
     private final ProcessScheduler scheduler;
 
@@ -263,7 +278,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         try {
             if (swapMode || activeQueue.size() >= swapThreshold) {
                 swapQueue.add(file);
-                incrementSwapQueueSize(1, file.getSize());
+                incrementSwapQueueSize(1, file.getSize(), 0);
                 swapMode = true;
                 writeSwapFilesIfNecessary();
             } else {
@@ -291,7 +306,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         try {
             if (swapMode || activeQueue.size() >= swapThreshold - numFiles) {
                 swapQueue.addAll(files);
-                incrementSwapQueueSize(numFiles, bytes);
+                incrementSwapQueueSize(numFiles, bytes, 0);
                 swapMode = true;
                 writeSwapFilesIfNecessary();
             } else {
@@ -450,7 +465,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                 for (final FlowFileRecord flowFile : swappedIn) {
                     swapSize += flowFile.getSize();
                 }
-                incrementSwapQueueSize(-swappedIn.size(), -swapSize);
+                incrementSwapQueueSize(-swappedIn.size(), -swapSize, -1);
                 incrementActiveQueueSize(swappedIn.size(), swapSize);
                 activeQueue.addAll(swappedIn);
                 return;
@@ -496,7 +511,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
 
         if (recordsMigrated > 0) {
             incrementActiveQueueSize(recordsMigrated, bytesMigrated);
-            incrementSwapQueueSize(-recordsMigrated, -bytesMigrated);
+            incrementSwapQueueSize(-recordsMigrated, -bytesMigrated, 0);
         }
 
         if (size.get().swappedCount == 0) {
@@ -587,7 +602,9 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
             final long addedSwapBytes = updatedSwapQueueBytes - 
originalSwapQueueBytes;
 
             final FlowFileQueueSize newSize = new 
FlowFileQueueSize(activeQueue.size(), activeQueueBytes,
-                originalSize.swappedCount + addedSwapRecords + 
flowFilesSwappedOut, originalSize.swappedBytes + addedSwapBytes + 
bytesSwappedOut,
+                originalSize.swappedCount + addedSwapRecords + 
flowFilesSwappedOut,
+                originalSize.swappedBytes + addedSwapBytes + bytesSwappedOut,
+                originalSize.swapFiles + numSwapFiles,
                 originalSize.unacknowledgedCount, 
originalSize.unacknowledgedBytes);
             updated = size.compareAndSet(originalSize, newSize);
         }
@@ -817,7 +834,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                 }
             }
 
-            incrementSwapQueueSize(swapFlowFileCount, swapByteCount);
+            incrementSwapQueueSize(swapFlowFileCount, swapByteCount, 
swapLocations.size());
             this.swapLocations.addAll(swapLocations);
         } finally {
             writeLock.unlock("Recover Swap Files");
@@ -832,7 +849,275 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         return "FlowFileQueue[id=" + identifier + "]";
     }
 
-    private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = 
new ConcurrentHashMap<>();
+
+    @Override
+    public ListFlowFileStatus listFlowFiles(final String requestIdentifier, 
final int maxResults) {
+        return listFlowFiles(requestIdentifier, maxResults, 
SortColumn.QUEUE_POSITION, SortDirection.ASCENDING);
+    }
+
+    @Override
+    public ListFlowFileStatus listFlowFiles(final String requestIdentifier, 
final int maxResults, final SortColumn sortColumn, final SortDirection 
direction) {
+        return listFlowFiles(requestIdentifier, maxResults, null, sortColumn, 
direction);
+    }
+
+    @Override
+    public ListFlowFileStatus listFlowFiles(final String requestIdentifier, 
final int maxResults, final String query, final SortColumn sortColumn, final 
SortDirection direction) {
+        final PreparedQuery preparedQuery;
+        if (query == null) {
+            preparedQuery = null;
+        } else {
+            try {
+                final ResultType resultType = 
Query.compile(query).getResultType();
+                if (resultType != ResultType.BOOLEAN) {
+                    throw new IllegalArgumentException("Invalid expression 
Language provided to search the listing of FlowFiles. "
+                        + "The expression must return a 'Boolean' type but 
returns a " + resultType.name() + " type");
+                }
+                preparedQuery = Query.prepare(query);
+            } catch (final AttributeExpressionLanguageParsingException e) {
+                throw new IllegalArgumentException("Invalid Expression 
Language provided to search the listing of FlowFiles: " + query, e);
+            }
+        }
+
+        // purge any old requests from the map just to keep it clean. But if 
there are very requests, which is usually the case, then don't bother
+        if (listRequestMap.size() > 10) {
+            final List<String> toDrop = new ArrayList<>();
+            for (final Map.Entry<String, ListFlowFileRequest> entry : 
listRequestMap.entrySet()) {
+                final ListFlowFileRequest request = entry.getValue();
+                final boolean completed = request.getState() == 
ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE;
+
+                if (completed && System.currentTimeMillis() - 
request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+                    toDrop.add(entry.getKey());
+                }
+            }
+
+            for (final String requestId : toDrop) {
+                listRequestMap.remove(requestId);
+            }
+        }
+
+        // numSteps = 1 for each swap location + 1 for active queue + 1 for 
swap queue.
+        final int numSteps = 2 + size.get().swapFiles;
+        final ListFlowFileRequest listRequest = new 
ListFlowFileRequest(requestIdentifier, sortColumn, direction, maxResults, 
size(), numSteps);
+
+        final Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                int position = 0;
+                int resultCount = 0;
+                final Comparator<FlowFileSummary> comparator = 
FlowFileSummaries.createComparator(sortColumn, direction);
+                final NavigableSet<FlowFileSummary> summaries = new 
TreeSet<>(comparator);
+                int completedStepCount = 0;
+
+                // we need a write lock while using the Active Queue because 
we can't iterate over it - we have to poll from it
+                // continually. This is because the iterator for PriorityQueue 
does not iterate over the elements in any particular
+                // order. Since we need the 'position' of the element in the 
queue, we need to iterate over them in the proper order.
+                writeLock.lock();
+                try {
+                    logger.debug("{} Acquired lock to perform listing of 
FlowFiles", StandardFlowFileQueue.this);
+                    listRequest.setState(ListFlowFileState.CALCULATING_LIST);
+                    final List<FlowFileRecord> flowFileRecords = new 
ArrayList<>(activeQueue.size());
+
+                    FlowFileRecord flowFile;
+                    try {
+                        while ((flowFile = activeQueue.poll()) != null) {
+                            flowFileRecords.add(flowFile);
+                            position++;
+
+                            if (preparedQuery == null || 
"true".equals(preparedQuery.evaluateExpressions(flowFile))) {
+                                summaries.add(summarize(flowFile, position));
+                                if (summaries.size() > maxResults) {
+                                    summaries.pollLast();
+                                }
+                            }
+                        }
+                    } finally {
+                        activeQueue.addAll(flowFileRecords);
+                    }
+                } finally {
+                    writeLock.unlock("List FlowFiles");
+                }
+
+                logger.debug("{} Finished listing FlowFiles for active queue 
with a total of {} results", StandardFlowFileQueue.this, resultCount);
+
+                listRequest.setCompletedStepCount(++completedStepCount);
+
+                position = activeQueue.size();
+                try {
+                    // We are now iterating over swap files, and we don't need 
the write lock for this, just the read lock, since
+                    // we are not modifying anything.
+                    readLock.lock();
+                    try {
+                        for (final String location : swapLocations) {
+                            logger.debug("{} Performing listing of FlowFiles 
for Swap Location {}", StandardFlowFileQueue.this, location);
+                            final List<FlowFileRecord> flowFiles = 
swapManager.peek(location, StandardFlowFileQueue.this);
+                            for (final FlowFileRecord flowFile : flowFiles) {
+                                position++;
+
+                                if (preparedQuery == null || 
"true".equals(preparedQuery.evaluateExpressions(flowFile))) {
+                                    summaries.add(summarize(flowFile, 
position));
+                                    if (summaries.size() > maxResults) {
+                                        summaries.pollLast();
+                                    }
+                                }
+                            }
+
+                            
listRequest.setCompletedStepCount(++completedStepCount);
+                        }
+
+                        logger.debug("{} Performing listing of FlowFiles from 
Swap Queue", StandardFlowFileQueue.this);
+                        for (final FlowFileRecord flowFile : swapQueue) {
+                            position++;
+
+                            if (preparedQuery == null || 
"true".equals(preparedQuery.evaluateExpressions(flowFile))) {
+                                summaries.add(summarize(flowFile, position));
+                                if (summaries.size() > maxResults) {
+                                    summaries.pollLast();
+                                }
+                            }
+                        }
+
+                        
listRequest.setCompletedStepCount(++completedStepCount);
+                    } finally {
+                        readLock.unlock("List FlowFiles");
+                    }
+                } catch (final IOException ioe) {
+                    logger.error("Failed to read swapped FlowFiles in order to 
perform listing of queue " + StandardFlowFileQueue.this, ioe);
+                    listRequest.setFailure("Could not read FlowFiles from 
queue. Check log files for more details.");
+                }
+
+                // We have now completed the listing successfully. Set the 
number of completed steps to the total number of steps. We may have
+                // skipped some steps because we have reached the maximum 
number of results, so we consider those steps completed.
+                logger.debug("{} Completed listing of FlowFiles", 
StandardFlowFileQueue.this);
+                
listRequest.setCompletedStepCount(listRequest.getTotalStepCount());
+                listRequest.setState(ListFlowFileState.COMPLETE);
+                listRequest.setFlowFileSummaries(new 
ArrayList<FlowFileSummary>(summaries));
+            }
+        }, "List FlowFiles for Connection " + getIdentifier());
+        t.setDaemon(true);
+        t.start();
+
+        listRequestMap.put(requestIdentifier, listRequest);
+        return listRequest;
+    }
+
+    private FlowFileSummary summarize(final FlowFile flowFile, final int 
position) {
+        // extract all of the information that we care about into new 
variables rather than just
+        // wrapping the FlowFile object with a FlowFileSummary object. We do 
this because we want to
+        // be able to hold many FlowFileSummary objects in memory and if we 
just wrap the FlowFile object,
+        // we will end up holding the entire FlowFile (including all 
Attributes) in the Java heap as well,
+        // which can be problematic if we expect them to be swapped out.
+        final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+        final String filename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+        final long size = flowFile.getSize();
+        final Long lastQueuedTime = flowFile.getLastQueueDate();
+        final long lineageStart = flowFile.getLineageStartDate();
+        final boolean penalized = flowFile.isPenalized();
+
+        return new FlowFileSummary() {
+            @Override
+            public String getUuid() {
+                return uuid;
+            }
+
+            @Override
+            public String getFilename() {
+                return filename;
+            }
+
+            @Override
+            public int getPosition() {
+                return position;
+            }
+
+            @Override
+            public long getSize() {
+                return size;
+            }
+
+            @Override
+            public long getLastQueuedTime() {
+                return lastQueuedTime == null ? 0L : lastQueuedTime;
+            }
+
+            @Override
+            public long getLineageStartDate() {
+                return lineageStart;
+            }
+
+            @Override
+            public boolean isPenalized() {
+                return penalized;
+            }
+        };
+    }
+
+
+    @Override
+    public ListFlowFileStatus getListFlowFileStatus(final String 
requestIdentifier) {
+        return listRequestMap.get(requestIdentifier);
+    }
+
+    @Override
+    public ListFlowFileStatus cancelListFlowFileRequest(final String 
requestIdentifier) {
+        logger.info("Canceling ListFlowFile Request with ID {}", 
requestIdentifier);
+        final ListFlowFileRequest request = 
listRequestMap.remove(requestIdentifier);
+        if (request != null) {
+            request.cancel();
+        }
+
+        return request;
+    }
+
+    @Override
+    public FlowFileRecord getFlowFile(final String flowFileUuid) throws 
IOException {
+        if (flowFileUuid == null) {
+            return null;
+        }
+
+        readLock.lock();
+        try {
+            // read through all of the FlowFiles in the queue, looking for the 
FlowFile with the given ID
+            for (final FlowFileRecord flowFile : activeQueue) {
+                if 
(flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
+                    return flowFile;
+                }
+            }
+
+            for (final FlowFileRecord flowFile : swapQueue) {
+                if 
(flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
+                    return flowFile;
+                }
+            }
+
+            // TODO: consider using a Long flowFileId instead of a UUID, and 
then having the swap manager
+            // write out the min and max FlowFile ID's. This would allow us to 
then have a method: boolean isFlowFilePossiblyContained(long id)
+            // which can return a boolean value that can be used to determine 
whether or not to even call peek
+            for (final String swapLocation : swapLocations) {
+                final List<FlowFileRecord> flowFiles = 
swapManager.peek(swapLocation, this);
+                for (final FlowFileRecord flowFile : flowFiles) {
+                    if 
(flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
+                        return flowFile;
+                    }
+                }
+            }
+        } finally {
+            readLock.unlock("getFlowFile");
+        }
+
+        return null;
+    }
+
+
+    @Override
+    public void verifyCanList() throws IllegalStateException {
+        if (connection.getSource().isRunning()) {
+            throw new IllegalStateException("Cannot list the FlowFiles of 
queue because the connection's source is still running");
+        }
+
+        if (connection.getDestination().isRunning()) {
+            throw new IllegalStateException("Cannot list the FlowFiles of 
queue because the connection's destination is still running");
+        }
+    }
 
     @Override
     public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, 
final String requestor) {
@@ -899,16 +1184,15 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                         dropRequest.setCurrentSize(getQueueSize());
                         
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
 
-                        try {
-                            final QueueSize swapSize = 
size.get().swapQueueSize();
-
-                            logger.debug("For DropFlowFileRequest {}, Swap 
Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}",
-                                requestIdentifier, swapQueue.size(), 
swapSize.getObjectCount(), swapSize.getByteCount());
-                            if (dropRequest.getState() == 
DropFlowFileState.CANCELED) {
-                                logger.info("Cancel requested for 
DropFlowFileRequest {}", requestIdentifier);
-                                return;
-                            }
+                        final QueueSize swapSize = size.get().swapQueueSize();
+                        logger.debug("For DropFlowFileRequest {}, Swap Queue 
has {} elements, Swapped Record Count = {}, Swapped Content Size = {}",
+                            requestIdentifier, swapQueue.size(), 
swapSize.getObjectCount(), swapSize.getByteCount());
+                        if (dropRequest.getState() == 
DropFlowFileState.CANCELED) {
+                            logger.info("Cancel requested for 
DropFlowFileRequest {}", requestIdentifier);
+                            return;
+                        }
 
+                        try {
                             droppedSize = drop(swapQueue, requestor);
                         } catch (final IOException ioe) {
                             logger.error("Failed to drop the FlowFiles from 
queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), 
ioe.toString());
@@ -922,7 +1206,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                         dropRequest.setCurrentSize(getQueueSize());
                         
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
                         swapMode = false;
-                        incrementSwapQueueSize(-droppedSize.getObjectCount(), 
-droppedSize.getByteCount());
+                        incrementSwapQueueSize(-droppedSize.getObjectCount(), 
-droppedSize.getByteCount(), 0);
                         logger.debug("For DropFlowFileRequest {}, dropped {} 
from Swap Queue", requestIdentifier, droppedSize);
 
                         final int swapFileCount = swapLocations.size();
@@ -953,7 +1237,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                             }
 
                             
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
-                            
incrementSwapQueueSize(-droppedSize.getObjectCount(), 
-droppedSize.getByteCount());
+                            
incrementSwapQueueSize(-droppedSize.getObjectCount(), 
-droppedSize.getByteCount(), -1);
 
                             dropRequest.setCurrentSize(getQueueSize());
                             swapLocationItr.remove();
@@ -1129,7 +1413,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         while (!updated) {
             final FlowFileQueueSize original = size.get();
             final FlowFileQueueSize newSize = new 
FlowFileQueueSize(original.activeQueueCount + count, original.activeQueueBytes 
+ bytes,
-                original.swappedCount, original.swappedBytes, 
original.unacknowledgedCount, original.unacknowledgedBytes);
+                original.swappedCount, original.swappedBytes, 
original.swapFiles, original.unacknowledgedCount, original.unacknowledgedBytes);
             updated = size.compareAndSet(original, newSize);
 
             if (updated) {
@@ -1138,12 +1422,12 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         }
     }
 
-    private void incrementSwapQueueSize(final int count, final long bytes) {
+    private void incrementSwapQueueSize(final int count, final long bytes, 
final int fileCount) {
         boolean updated = false;
         while (!updated) {
             final FlowFileQueueSize original = size.get();
             final FlowFileQueueSize newSize = new 
FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
-                original.swappedCount + count, original.swappedBytes + bytes, 
original.unacknowledgedCount, original.unacknowledgedBytes);
+                original.swappedCount + count, original.swappedBytes + bytes, 
original.swapFiles + fileCount, original.unacknowledgedCount, 
original.unacknowledgedBytes);
             updated = size.compareAndSet(original, newSize);
 
             if (updated) {
@@ -1157,7 +1441,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         while (!updated) {
             final FlowFileQueueSize original = size.get();
             final FlowFileQueueSize newSize = new 
FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
-                original.swappedCount, original.swappedBytes, 
original.unacknowledgedCount + count, original.unacknowledgedBytes + bytes);
+                original.swappedCount, original.swappedBytes, 
original.swapFiles, original.unacknowledgedCount + count, 
original.unacknowledgedBytes + bytes);
             updated = size.compareAndSet(original, newSize);
 
             if (updated) {
@@ -1181,15 +1465,17 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         private final long activeQueueBytes;
         private final int swappedCount;
         private final long swappedBytes;
+        private final int swapFiles;
         private final int unacknowledgedCount;
         private final long unacknowledgedBytes;
 
-        public FlowFileQueueSize(final int activeQueueCount, final long 
activeQueueBytes, final int swappedCount, final long swappedBytes,
+        public FlowFileQueueSize(final int activeQueueCount, final long 
activeQueueBytes, final int swappedCount, final long swappedBytes, final int 
swapFileCount,
             final int unacknowledgedCount, final long unacknowledgedBytes) {
             this.activeQueueCount = activeQueueCount;
             this.activeQueueBytes = activeQueueBytes;
             this.swappedCount = swappedCount;
             this.swappedBytes = swappedBytes;
+            this.swapFiles = swapFileCount;
             this.unacknowledgedCount = unacknowledgedCount;
             this.unacknowledgedBytes = unacknowledgedBytes;
         }
@@ -1218,7 +1504,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         public String toString() {
             return "FlowFile Queue Size[ ActiveQueue=[" + activeQueueCount + 
", " + activeQueueBytes +
                 " Bytes], Swap Queue=[" + swappedCount + ", " + swappedBytes +
-                " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + 
unacknowledgedBytes + " Bytes] ]";
+                " Bytes], Swap Files=[" + swapFiles + "], Unacknowledged=[" + 
unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]";
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 639a4c8..211baa7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -110,7 +110,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     // We can then destroy the data. If we end up syncing the FlowFile 
Repository to the backing storage mechanism and then restart
     // before the data is destroyed, it's okay because the data will be 
unknown to the Content Repository, so it will be destroyed
     // on restart.
-    private final ConcurrentMap<Integer, BlockingQueue<ContentClaim>> 
claimsAwaitingDestruction = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Integer, BlockingQueue<ResourceClaim>> 
claimsAwaitingDestruction = new ConcurrentHashMap<>();
 
     public WriteAheadFlowFileRepository() {
         final NiFiProperties properties = NiFiProperties.getInstance();
@@ -169,12 +169,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         updateRepository(records, alwaysSync);
     }
 
-    private void markDestructable(final ContentClaim contentClaim) {
-        if (contentClaim == null) {
-            return;
-        }
-
-        final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+    private void markDestructable(final ResourceClaim resourceClaim) {
         if (resourceClaim == null) {
             return;
         }
@@ -213,22 +208,22 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         // However, the result of this is that the FileSystem Repository may 
end up trying to remove the content multiple times.
         // This does not, however, cause problems, as ContentRepository should 
handle this
         // This does indicate that some refactoring should probably be 
performed, though, as this is not a very clean interface.
-        final Set<ContentClaim> claimsToAdd = new HashSet<>();
+        final Set<ResourceClaim> claimsToAdd = new HashSet<>();
         for (final RepositoryRecord record : records) {
             if (record.getType() == RepositoryRecordType.DELETE) {
                 // For any DELETE record that we have, if current claim's 
claimant count <= 0, mark it as destructable
                 if (record.getCurrentClaim() != null && 
getClaimantCount(record.getCurrentClaim()) <= 0) {
-                    claimsToAdd.add(record.getCurrentClaim());
+                    
claimsToAdd.add(record.getCurrentClaim().getResourceClaim());
                 }
 
                 // If the original claim is different than the current claim 
and the original claim has a claimant count <= 0, mark it as destructable.
                 if (record.getOriginalClaim() != null && 
!record.getOriginalClaim().equals(record.getCurrentClaim()) && 
getClaimantCount(record.getOriginalClaim()) <= 0) {
-                    claimsToAdd.add(record.getOriginalClaim());
+                    
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
                 }
             } else if (record.getType() == RepositoryRecordType.UPDATE) {
                 // if we have an update, and the original is no longer needed, 
mark original as destructable
                 if (record.getOriginalClaim() != null && 
record.getCurrentClaim() != record.getOriginalClaim() && 
getClaimantCount(record.getOriginalClaim()) <= 0) {
-                    claimsToAdd.add(record.getOriginalClaim());
+                    
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
                 }
             }
         }
@@ -236,10 +231,10 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         if (!claimsToAdd.isEmpty()) {
             // Get / Register a Set<ContentClaim> for the given Partiton Index
             final Integer partitionKey = Integer.valueOf(partitionIndex);
-            BlockingQueue<ContentClaim> claimQueue = 
claimsAwaitingDestruction.get(partitionKey);
+            BlockingQueue<ResourceClaim> claimQueue = 
claimsAwaitingDestruction.get(partitionKey);
             if (claimQueue == null) {
                 claimQueue = new LinkedBlockingQueue<>();
-                final BlockingQueue<ContentClaim> existingClaimQueue = 
claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
+                final BlockingQueue<ResourceClaim> existingClaimQueue = 
claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
                 if (existingClaimQueue != null) {
                     claimQueue = existingClaimQueue;
                 }
@@ -252,26 +247,26 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
     @Override
     public void onSync(final int partitionIndex) {
-        final BlockingQueue<ContentClaim> claimQueue = 
claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex));
+        final BlockingQueue<ResourceClaim> claimQueue = 
claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex));
         if (claimQueue == null) {
             return;
         }
 
-        final Set<ContentClaim> claimsToDestroy = new HashSet<>();
+        final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
         claimQueue.drainTo(claimsToDestroy);
 
-        for (final ContentClaim claim : claimsToDestroy) {
+        for (final ResourceClaim claim : claimsToDestroy) {
             markDestructable(claim);
         }
     }
 
     @Override
     public void onGlobalSync() {
-        for (final BlockingQueue<ContentClaim> claimQueue : 
claimsAwaitingDestruction.values()) {
-            final Set<ContentClaim> claimsToDestroy = new HashSet<>();
+        for (final BlockingQueue<ResourceClaim> claimQueue : 
claimsAwaitingDestruction.values()) {
+            final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
             claimQueue.drainTo(claimsToDestroy);
 
-            for (final ContentClaim claim : claimsToDestroy) {
+            for (final ResourceClaim claim : claimsToDestroy) {
                 markDestructable(claim);
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 09ac7f2..2a46ec6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -17,29 +17,17 @@
 
 package org.apache.nifi.controller;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.queue.DropFlowFileState;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileSummary;
+import org.apache.nifi.controller.queue.ListFlowFileState;
+import org.apache.nifi.controller.queue.ListFlowFileStatus;
 import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.SortDirection;
+import org.apache.nifi.controller.queue.SortColumn;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
@@ -55,17 +43,40 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 public class TestStandardFlowFileQueue {
     private TestSwapManager swapManager = null;
     private StandardFlowFileQueue queue = null;
 
     private List<ProvenanceEventRecord> provRecords = new ArrayList<>();
 
+    @BeforeClass
+    public static void setupLogging() {
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", 
"DEBUG");
+    }
+
     @Before
     @SuppressWarnings("unchecked")
     public void setup() {
@@ -388,6 +399,167 @@ public class TestStandardFlowFileQueue {
         assertEquals(20, swapManager.swapInCalledCount);
     }
 
+
+    @Test(timeout = 5000)
+    public void testListFlowFilesOnlyActiveQueue() throws InterruptedException 
{
+        for (int i = 0; i < 9999; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        final ListFlowFileStatus status = 
queue.listFlowFiles(UUID.randomUUID().toString(), 10000);
+        assertNotNull(status);
+        assertEquals(9999, status.getQueueSize().getObjectCount());
+
+        while (status.getState() != ListFlowFileState.COMPLETE) {
+            Thread.sleep(100);
+        }
+
+        assertEquals(9999, status.getFlowFileSummaries().size());
+        assertEquals(100, status.getCompletionPercentage());
+        assertNull(status.getFailureReason());
+        assertEquals(2, status.getTotalStepCount());
+        assertEquals(2, status.getCompletedStepCount());
+    }
+
+    @Test(timeout = 5000)
+    public void testListFlowFilesActiveQueueAndSwapQueue() throws 
InterruptedException {
+        for (int i = 0; i < 11000; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        final ListFlowFileStatus status = 
queue.listFlowFiles(UUID.randomUUID().toString(), 11000);
+        assertNotNull(status);
+        assertEquals(11000, status.getQueueSize().getObjectCount());
+
+        while (status.getState() != ListFlowFileState.COMPLETE) {
+            Thread.sleep(100);
+        }
+
+        assertEquals(11000, status.getFlowFileSummaries().size());
+        assertEquals(100, status.getCompletionPercentage());
+        assertNull(status.getFailureReason());
+        assertEquals(2, status.getTotalStepCount());
+        assertEquals(2, status.getCompletedStepCount());
+    }
+
+    @Test(timeout = 5000)
+    public void testListFlowFilesActiveQueueAndSwapFile() throws 
InterruptedException {
+        for (int i = 0; i < 20000; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        final ListFlowFileStatus status = 
queue.listFlowFiles(UUID.randomUUID().toString(), 20000);
+        assertNotNull(status);
+        assertEquals(20000, status.getQueueSize().getObjectCount());
+
+        while (status.getState() != ListFlowFileState.COMPLETE) {
+            Thread.sleep(100);
+        }
+
+        assertEquals(20000, status.getFlowFileSummaries().size());
+        assertEquals(100, status.getCompletionPercentage());
+        assertNull(status.getFailureReason());
+        assertEquals(3, status.getTotalStepCount());
+        assertEquals(3, status.getCompletedStepCount());
+    }
+
+    @Test(timeout = 5000)
+    public void testListFlowFilesActiveQueueAndSwapFilesAndSwapQueue() throws 
InterruptedException {
+        for (int i = 0; i < 30050; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        final ListFlowFileStatus status = 
queue.listFlowFiles(UUID.randomUUID().toString(), 30050);
+        assertNotNull(status);
+        assertEquals(30050, status.getQueueSize().getObjectCount());
+
+        while (status.getState() != ListFlowFileState.COMPLETE) {
+            Thread.sleep(100);
+        }
+
+        assertEquals(30050, status.getFlowFileSummaries().size());
+        assertEquals(100, status.getCompletionPercentage());
+        assertNull(status.getFailureReason());
+        assertEquals(4, status.getTotalStepCount());
+        assertEquals(4, status.getCompletedStepCount());
+    }
+
+    @Test(timeout = 5000)
+    public void testListFlowFilesResultsLimited() throws InterruptedException {
+        for (int i = 0; i < 30050; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        final ListFlowFileStatus status = 
queue.listFlowFiles(UUID.randomUUID().toString(), 100);
+        assertNotNull(status);
+        assertEquals(30050, status.getQueueSize().getObjectCount());
+
+        while (status.getState() != ListFlowFileState.COMPLETE) {
+            Thread.sleep(100);
+        }
+
+        assertEquals(100, status.getFlowFileSummaries().size());
+        assertEquals(100, status.getCompletionPercentage());
+        assertNull(status.getFailureReason());
+        assertEquals(4, status.getTotalStepCount());
+        assertEquals(4, status.getCompletedStepCount());
+    }
+
+    @Test
+    public void testListFlowFilesSortedAscending() throws InterruptedException 
{
+        for (int i = 0; i < 30050; i++) {
+            queue.put(new TestFlowFile(i));
+        }
+
+        final ListFlowFileStatus status = 
queue.listFlowFiles(UUID.randomUUID().toString(), 100, 
SortColumn.FLOWFILE_SIZE, SortDirection.ASCENDING);
+        assertNotNull(status);
+        assertEquals(30050, status.getQueueSize().getObjectCount());
+
+        while (status.getState() != ListFlowFileState.COMPLETE) {
+            Thread.sleep(100);
+        }
+
+        assertEquals(100, status.getFlowFileSummaries().size());
+        assertEquals(100, status.getCompletionPercentage());
+
+        assertNull(status.getFailureReason());
+        assertEquals(4, status.getTotalStepCount());
+        assertEquals(4, status.getCompletedStepCount());
+
+        int counter = 0;
+        for (final FlowFileSummary summary : status.getFlowFileSummaries()) {
+            assertEquals(counter++, summary.getSize());
+        }
+    }
+
+    @Test
+    public void testListFlowFilesSortedDescending() throws 
InterruptedException {
+        for (int i = 0; i < 30050; i++) {
+            queue.put(new TestFlowFile(i));
+        }
+
+        final ListFlowFileStatus status = 
queue.listFlowFiles(UUID.randomUUID().toString(), 100, 
SortColumn.FLOWFILE_SIZE, SortDirection.DESCENDING);
+        assertNotNull(status);
+        assertEquals(30050, status.getQueueSize().getObjectCount());
+
+        while (status.getState() != ListFlowFileState.COMPLETE) {
+            Thread.sleep(100);
+        }
+
+        assertEquals(100, status.getFlowFileSummaries().size());
+        assertEquals(100, status.getCompletionPercentage());
+
+        assertNull(status.getFailureReason());
+        assertEquals(4, status.getTotalStepCount());
+        assertEquals(4, status.getCompletedStepCount());
+
+        int counter = 0;
+        for (final FlowFileSummary summary : status.getFlowFileSummaries()) {
+            assertEquals((30050 - 1 - counter++), summary.getSize());
+        }
+    }
+
+
     private class TestSwapManager implements FlowFileSwapManager {
         private final Map<String, List<FlowFileRecord>> swappedOut = new 
HashMap<>();
         int swapOutCalledCount = 0;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 73d76bd..4bc1222 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -21,6 +21,8 @@ import java.util.Date;
 import java.util.Set;
 import org.apache.nifi.controller.ScheduledState;
 
+import org.apache.nifi.controller.queue.SortColumn;
+import org.apache.nifi.controller.queue.SortDirection;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.web.api.dto.BulletinBoardDTO;
@@ -33,9 +35,11 @@ import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.CounterDTO;
 import org.apache.nifi.web.api.dto.CountersDTO;
 import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+import org.apache.nifi.web.api.dto.FlowFileDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.FunnelDTO;
 import org.apache.nifi.web.api.dto.LabelDTO;
+import org.apache.nifi.web.api.dto.ListingRequestDTO;
 import org.apache.nifi.web.api.dto.NodeDTO;
 import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
 import org.apache.nifi.web.api.dto.PortDTO;
@@ -119,6 +123,17 @@ public interface NiFiServiceFacade {
     DownloadableContent getContent(Long eventId, String uri, ContentDirection 
contentDirection);
 
     /**
+     * Gets the content for the specified flowfile in the specified connection.
+     *
+     * @param groupId group
+     * @param connectionId connection
+     * @param flowfileUuid flowfile
+     * @param uri uri
+     * @return content
+     */
+    DownloadableContent getContent(String groupId, String connectionId, String 
flowfileUuid, String uri);
+
+    /**
      * Retrieves provenance.
      *
      * @param queryId identifier
@@ -483,6 +498,14 @@ public interface NiFiServiceFacade {
     ConfigurationSnapshot<ConnectionDTO> createConnection(Revision revision, 
String groupId, ConnectionDTO connectionDTO);
 
     /**
+     * Determines if this connection can be listed.
+     *
+     * @param groupId group
+     * @param connectionId connection
+     */
+    void verifyListQueue(String groupId, String connectionId);
+
+    /**
      * Determines if this connection can be created.
      *
      * @param groupId group
@@ -556,6 +579,48 @@ public interface NiFiServiceFacade {
      */
     DropRequestDTO deleteFlowFileDropRequest(String groupId, String 
connectionId, String dropRequestId);
 
+    /**
+     * Creates a new flow file listing request.
+     *
+     * @param groupId group
+     * @param connectionId The ID of the connection
+     * @param listingRequestId The ID of the listing request
+     * @param column sort column
+     * @param direction sort direction
+     * @return The ListingRequest
+     */
+    ListingRequestDTO createFlowFileListingRequest(String groupId, String 
connectionId, String listingRequestId, SortColumn column, SortDirection 
direction);
+
+    /**
+     * Gets a new flow file listing request.
+     *
+     * @param groupId group
+     * @param connectionId The ID of the connection
+     * @param listingRequestId The ID of the listing request
+     * @return The ListingRequest
+     */
+    ListingRequestDTO getFlowFileListingRequest(String groupId, String 
connectionId, String listingRequestId);
+
+    /**
+     * Deletes a new flow file listing request.
+     *
+     * @param groupId group
+     * @param connectionId The ID of the connection
+     * @param listingRequestId The ID of the listing request
+     * @return The ListingRequest
+     */
+    ListingRequestDTO deleteFlowFileListingRequest(String groupId, String 
connectionId, String listingRequestId);
+
+    /**
+     * Gets the specified flowfile from the specified connection.
+     *
+     * @param groupId group
+     * @param connectionId The ID of the connection
+     * @param flowFileUuid The UUID of the flowfile
+     * @return The FlowFileDTO
+     */
+    FlowFileDTO getFlowFile(String groupId, String connectionId, String 
flowFileUuid);
+
     // ----------------------------------------
     // InputPort methods
     // ----------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
index 2f75004..f994c52 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.core.MultivaluedMap;
 import org.apache.commons.lang3.StringUtils;
@@ -51,6 +53,12 @@ public class StandardNiFiContentAccess implements 
ContentAccess {
     private static final Logger logger = 
LoggerFactory.getLogger(StandardNiFiContentAccess.class);
     public static final String CLIENT_ID_PARAM = "clientId";
 
+    private static final Pattern FLOWFILE_CONTENT_URI_PATTERN = Pattern
+        
.compile("/controller/process-groups/((?:root)|(?:[a-f0-9\\-]{36}))/connections/([a-f0-9\\-]{36})/flowfiles/([a-f0-9\\-]{36})/content");
+
+    private static final Pattern PROVENANCE_CONTENT_URI_PATTERN = Pattern
+        
.compile("/controller/provenance/events/([0-9]+)/content/((?:input)|(?:output))");
+
     private NiFiProperties properties;
     private NiFiServiceFacade serviceFacade;
     private WebClusterManager clusterManager;
@@ -115,21 +123,41 @@ public class StandardNiFiContentAccess implements 
ContentAccess {
             // create the downloadable content
             return new DownloadableContent(filename, contentType, 
clientResponse.getEntityInputStream());
         } else {
-            // example URI: 
http://localhost:8080/nifi-api/controller/provenance/events/1/content/input
-            final String eventDetails = 
StringUtils.substringAfterLast(request.getDataUri(), "events/");
-            final String rawEventId = 
StringUtils.substringBefore(eventDetails, "/content/");
-            final String rawDirection = 
StringUtils.substringAfterLast(eventDetails, "/content/");
+            // example URIs:
+            // 
http://localhost:8080/nifi-api/controller/provenance/events/{id}/content/{input|output}
+            // 
http://localhost:8080/nifi-api/controller/process-groups/{root|uuid}/connections/{uuid}/flowfiles/{uuid}/content
 
-            // get the content type
-            final Long eventId;
-            final ContentDirection direction;
-            try {
-                eventId = Long.parseLong(rawEventId);
-                direction = 
ContentDirection.valueOf(rawDirection.toUpperCase());
-            } catch (final IllegalArgumentException iae) {
+            // get just the context path for comparison
+            final String dataUri = 
StringUtils.substringAfter(request.getDataUri(), "/nifi-api");
+            if (StringUtils.isBlank(dataUri)) {
                 throw new IllegalArgumentException("The specified data 
reference URI is not valid.");
             }
-            return serviceFacade.getContent(eventId, request.getDataUri(), 
direction);
+
+            // flowfile listing content
+            final Matcher flowFileMatcher = 
FLOWFILE_CONTENT_URI_PATTERN.matcher(dataUri);
+            if (flowFileMatcher.matches()) {
+                final String groupId = flowFileMatcher.group(1);
+                final String connectionId = flowFileMatcher.group(2);
+                final String flowfileId = flowFileMatcher.group(3);
+
+                return serviceFacade.getContent(groupId, connectionId, 
flowfileId, dataUri);
+            }
+
+            // provenance event content
+            final Matcher provenanceMatcher = 
PROVENANCE_CONTENT_URI_PATTERN.matcher(dataUri);
+            if (provenanceMatcher.matches()) {
+                try {
+                    final Long eventId = 
Long.parseLong(provenanceMatcher.group(1));
+                    final ContentDirection direction = 
ContentDirection.valueOf(provenanceMatcher.group(2).toUpperCase());
+
+                    return serviceFacade.getContent(eventId, dataUri, 
direction);
+                } catch (final IllegalArgumentException iae) {
+                    throw new IllegalArgumentException("The specified data 
reference URI is not valid.");
+                }
+            }
+
+            // invalid uri
+            throw new IllegalArgumentException("The specified data reference 
URI is not valid.");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index e7a3328..2f92588 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -62,6 +62,8 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.controller.Template;
 import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.SortColumn;
+import org.apache.nifi.controller.queue.SortDirection;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.PortStatus;
@@ -80,6 +82,8 @@ import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.web.api.dto.FlowFileDTO;
+import org.apache.nifi.web.api.dto.ListingRequestDTO;
 import org.apache.nifi.web.security.user.NiFiUserUtils;
 import org.apache.nifi.user.AccountStatus;
 import org.apache.nifi.user.NiFiUser;
@@ -217,6 +221,12 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     // -----------------------------------------
     // Verification Operations
     // -----------------------------------------
+
+    @Override
+    public void verifyListQueue(String groupId, String connectionId) {
+        connectionDAO.verifyList(groupId, connectionId);
+    }
+
     @Override
     public void verifyCreateConnection(String groupId, ConnectionDTO 
connectionDTO) {
         connectionDAO.verifyCreate(groupId, connectionDTO);
@@ -817,6 +827,11 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
+    public ListingRequestDTO deleteFlowFileListingRequest(String groupId, 
String connectionId, String listingRequestId) {
+        return 
dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(groupId,
 connectionId, listingRequestId));
+    }
+
+    @Override
     public ConfigurationSnapshot<Void> deleteProcessor(final Revision 
revision, final String groupId, final String processorId) {
         return optimisticLockingManager.configureFlow(revision, new 
ConfigurationRequest<Void>() {
             @Override
@@ -1069,7 +1084,12 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public DropRequestDTO createFlowFileDropRequest(String groupId, String 
connectionId, String dropRequestId) {
-        return 
dtoFactory.createDropRequestDTO(connectionDAO.createFileFlowDropRequest(groupId,
 connectionId, dropRequestId));
+        return 
dtoFactory.createDropRequestDTO(connectionDAO.createFlowFileDropRequest(groupId,
 connectionId, dropRequestId));
+    }
+
+    @Override
+    public ListingRequestDTO createFlowFileListingRequest(String groupId, 
String connectionId, String listingRequestId, SortColumn column, SortDirection 
direction) {
+        return 
dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(groupId,
 connectionId, listingRequestId, column, direction));
     }
 
     @Override
@@ -1949,6 +1969,11 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
+    public DownloadableContent getContent(String groupId, String connectionId, 
String flowFileUuid, String uri) {
+        return connectionDAO.getContent(groupId, connectionId, flowFileUuid, 
uri);
+    }
+
+    @Override
     public DownloadableContent getContent(Long eventId, String uri, 
ContentDirection contentDirection) {
         return controllerFacade.getContent(eventId, uri, contentDirection);
     }
@@ -2127,6 +2152,16 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
+    public ListingRequestDTO getFlowFileListingRequest(String groupId, String 
connectionId, String listingRequestId) {
+        return 
dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(groupId,
 connectionId, listingRequestId));
+    }
+
+    @Override
+    public FlowFileDTO getFlowFile(String groupId, String connectionId, String 
flowFileUuid) {
+        return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(groupId, 
connectionId, flowFileUuid));
+    }
+
+    @Override
     public StatusHistoryDTO getConnectionStatusHistory(String groupId, String 
connectionId) {
         return controllerFacade.getConnectionStatusHistory(groupId, 
connectionId);
     }

Reply via email to