Repository: nifi
Updated Branches:
  refs/heads/NIFI-108 670733753 -> b12aba782


NIFI-108: Implementing ability to list FlowFiles in a queue


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/245d8d4d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/245d8d4d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/245d8d4d

Branch: refs/heads/NIFI-108
Commit: 245d8d4d2d3fbd6182e7e587d3433cc7378cc37a
Parents: 2e22954
Author: Mark Payne <[email protected]>
Authored: Thu Dec 17 13:00:30 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Thu Dec 17 13:00:30 2015 -0500

----------------------------------------------------------------------
 .../nifi/controller/queue/FlowFileQueue.java    |  33 ++-
 .../controller/queue/ListFlowFileStatus.java    |   5 +
 .../cluster/manager/impl/WebClusterManager.java |   8 +
 .../controller/queue/ListFlowFileRequest.java   |  79 +++++--
 .../apache/nifi/controller/FlowController.java  |  39 +++-
 .../nifi/controller/StandardFlowFileQueue.java  | 234 ++++++++++++++++++-
 6 files changed, 363 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java 
b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index fe8649d..dbf2f04 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller.queue;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Queue;
@@ -217,12 +218,32 @@ public interface FlowFileQueue {
      * will be returned ordered by the position of the FlowFile in the queue.
      *
      * @param requestIdentifier the identifier of the List FlowFile Request
+     * @param maxResults the maximum number of FlowFileSummary objects to add 
to the ListFlowFileStatus
+     *
+     * @return the status for the request
+     *
+     * @throws IllegalStateException if either the source or the destination 
of the connection to which this queue belongs
+     *             is currently running.
+     */
+    ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults);
+
+    /**
+     * Initiates a request to obtain a listing of FlowFiles in this queue. 
This method returns a
+     * ListFlowFileStatus that can be used to obtain information about the 
FlowFiles that exist
+     * within the queue. Additionally, the ListFlowFileStatus provides a 
request identifier that
+     * can then be passed to the {@link #getListFlowFileStatus(String)}
+     *
+     * @param requestIdentifier the identifier of the List FlowFile Request
+     * @param maxResults the maximum number of FlowFileSummary objects to add 
to the ListFlowFileStatus
+     * @param sortColumn specifies which column to sort on
+     * @param direction specifies which direction to sort the FlowFiles
+     *
      * @return the status for the request
      *
      * @throws IllegalStateException if either the source or the destination 
of the connection to which this queue belongs
      *             is currently running.
      */
-    ListFlowFileStatus listFlowFiles(String requestIdentifier);
+    ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, 
SortColumn sortColumn, SortDirection direction);
 
     /**
      * Initiates a request to obtain a listing of FlowFiles in this queue. 
This method returns a
@@ -231,6 +252,9 @@ public interface FlowFileQueue {
      * can then be passed to the {@link #getListFlowFileStatus(String)}
      *
      * @param requestIdentifier the identifier of the List FlowFile Request
+     * @param maxResults the maximum number of FlowFileSummary objects to add 
to the ListFlowFileStatus
+     * @param query an Expression Language expression that will be evaluated 
against all FlowFiles. Only FlowFiles that satisfy the expression will
+     *            be included in the results. The expression must be a valid 
expression and return a Boolean type
      * @param sortColumn specifies which column to sort on
      * @param direction specifies which direction to sort the FlowFiles
      *
@@ -238,8 +262,9 @@ public interface FlowFileQueue {
      *
      * @throws IllegalStateException if either the source or the destination 
of the connection to which this queue belongs
      *             is currently running.
+     * @throws IllegalArgumentException if query is not a valid Expression 
Language expression or does not return a boolean type
      */
-    ListFlowFileStatus listFlowFiles(String requestIdentifier, SortColumn 
sortColumn, SortDirection direction);
+    ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, 
String query, SortColumn sortColumn, SortDirection direction);
 
     /**
      * Returns the current status of a List FlowFile Request that was 
initiated via the {@link #listFlowFiles(String)}
@@ -269,6 +294,8 @@ public interface FlowFileQueue {
      * @param flowFileUuid the UUID of the FlowFile to retrieve
      * @return the FlowFile with the given UUID or <code>null</code> if no 
FlowFile can be found in this queue
      *         with the given UUID
+     *
+     * @throws IOException if unable to read FlowFiles that are stored on some 
external device
      */
-    FlowFileRecord getFlowFile(String flowFileUuid);
+    FlowFileRecord getFlowFile(String flowFileUuid) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
index 2959170..5781283 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
@@ -67,4 +67,9 @@ public interface ListFlowFileStatus {
      * @return a List of FlowFileSummary objects
      */
     List<FlowFileSummary> getFlowFileSummaries();
+
+    /**
+     * @return the percentage (an integer between 0 and 100, inclusive) of how 
close the request is to being completed
+     */
+    int getCompletionPercentage();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 5e9dd3c..035a888 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -321,6 +321,8 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
     public static final Pattern QUEUE_CONTENTS_URI = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
     public static final Pattern DROP_REQUEST_URI = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}");
+    public static final Pattern LIST_FLOWFILES_URI = Pattern
+        
.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/listing-requests/[a-f0-9\\-]{36}");
 
     private final NiFiProperties properties;
     private final HttpRequestReplicator httpRequestReplicator;
@@ -2431,6 +2433,10 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         return "GET".equalsIgnoreCase(method) && 
PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
     }
 
+    private static boolean isListFlowFilesEndpoint(final URI uri, final String 
method) {
+        return "GET".equalsIgnoreCase(method) && 
LIST_FLOWFILES_URI.matcher(uri.getPath()).matches();
+    }
+
     private static boolean isCountersEndpoint(final URI uri) {
         return COUNTERS_URI.matcher(uri.getPath()).matches();
     }
@@ -3309,6 +3315,8 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             mergeDropRequests(dropRequest, resultsMap);
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isListFlowFilesEndpoint(uri, 
method)) {
+            // TODO: IMPLEMENT
         } else {
             if (!nodeResponsesToDrain.isEmpty()) {
                 drainResponses(nodeResponsesToDrain);

http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
index 03e0188..e83fd80 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
@@ -17,63 +17,93 @@
 
 package org.apache.nifi.controller.queue;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 public class ListFlowFileRequest implements ListFlowFileStatus {
+    private final String requestId;
+    private final QueueSize queueSize;
+    private final SortColumn sortColumn;
+    private final SortDirection sortDirection;
+    private final long submissionTime = System.currentTimeMillis();
+    private final List<FlowFileSummary> flowFileSummaries = new ArrayList<>();
+
     private ListFlowFileState state;
+    private String failureReason;
+    private int numSteps;
+    private int numCompletedSteps;
+    private long lastUpdated = System.currentTimeMillis();
+
+    public ListFlowFileRequest(final String requestId, final SortColumn 
sortColumn, final SortDirection sortDirection, final QueueSize queueSize, final 
int numSteps) {
+        this.requestId = requestId;
+        this.sortColumn = sortColumn;
+        this.sortDirection = sortDirection;
+        this.queueSize = queueSize;
+        this.numSteps = numSteps;
+    }
 
     @Override
     public String getRequestIdentifier() {
-        // TODO Auto-generated method stub
-        return null;
+        return requestId;
     }
 
     @Override
     public long getRequestSubmissionTime() {
-        // TODO Auto-generated method stub
-        return 0;
+        return submissionTime;
     }
 
     @Override
-    public long getLastUpdated() {
-        // TODO Auto-generated method stub
-        return 0;
+    public synchronized long getLastUpdated() {
+        return lastUpdated;
     }
 
     @Override
     public SortColumn getSortColumn() {
-        // TODO Auto-generated method stub
-        return null;
+        return sortColumn;
     }
 
     @Override
     public SortDirection getSortDirection() {
-        // TODO Auto-generated method stub
-        return null;
+        return sortDirection;
     }
 
     @Override
-    public ListFlowFileState getState() {
-        // TODO Auto-generated method stub
-        return null;
+    public synchronized ListFlowFileState getState() {
+        return state;
     }
 
     @Override
-    public String getFailureReason() {
-        // TODO Auto-generated method stub
-        return null;
+    public synchronized String getFailureReason() {
+        return failureReason;
+    }
+
+    public synchronized void setState(final ListFlowFileState state) {
+        this.state = state;
+        this.lastUpdated = System.currentTimeMillis();
+    }
+
+    public synchronized void setFailure(final String explanation) {
+        this.state = ListFlowFileState.FAILURE;
+        this.failureReason = explanation;
+        this.lastUpdated = System.currentTimeMillis();
     }
 
     @Override
-    public List<FlowFileSummary> getFlowFileSummaries() {
-        // TODO Auto-generated method stub
-        return null;
+    public synchronized List<FlowFileSummary> getFlowFileSummaries() {
+        return Collections.unmodifiableList(flowFileSummaries);
+    }
+
+    public synchronized void addFlowFileSummaries(final List<FlowFileSummary> 
summaries) {
+        // TODO: Implement.
+
+        lastUpdated = System.currentTimeMillis();
+        this.numCompletedSteps++;
     }
 
     @Override
     public QueueSize getQueueSize() {
-        // TODO Auto-generated method stub
-        return null;
+        return queueSize;
     }
 
     public synchronized boolean cancel() {
@@ -84,4 +114,9 @@ public class ListFlowFileRequest implements 
ListFlowFileStatus {
         this.state = ListFlowFileState.CANCELED;
         return true;
     }
+
+    @Override
+    public synchronized int getCompletionPercentage() {
+        return (int) (100F * numCompletedSteps / numSteps);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2ad102f..dd3b687 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -177,6 +177,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
+
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -3242,8 +3244,41 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     public InputStream getContent(final FlowFileRecord flowFile, final String 
requestor, final String requestUri) throws IOException {
-        // TODO: IMPLEMENT
-        return null;
+        requireNonNull(flowFile);
+        requireNonNull(requestor);
+        requireNonNull(requestUri);
+
+        final InputStream stream;
+        final ResourceClaim resourceClaim;
+        final ContentClaim contentClaim = flowFile.getContentClaim();
+        if (contentClaim == null) {
+            resourceClaim = null;
+            stream = new ByteArrayInputStream(new byte[0]);
+        } else {
+            resourceClaim = flowFile.getContentClaim().getResourceClaim();
+            stream = contentRepository.read(flowFile.getContentClaim());
+        }
+
+        // Register a Provenance Event to indicate that we replayed the data.
+        final StandardProvenanceEventRecord.Builder sendEventBuilder = new 
StandardProvenanceEventRecord.Builder()
+            .setEventType(ProvenanceEventType.DOWNLOAD)
+            .setFlowFileUUID(flowFile.getAttribute(CoreAttributes.UUID.key()))
+            .setAttributes(flowFile.getAttributes(), Collections.<String, 
String> emptyMap())
+            .setTransitUri(requestUri)
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileEntryDate(flowFile.getEntryDate())
+            .setLineageStartDate(flowFile.getLineageStartDate())
+            .setComponentType(getName())
+            .setComponentId(getRootGroupId())
+            .setDetails("Download of Content requested by " + requestor + " 
for " + flowFile);
+
+        if (contentClaim != null) {
+            
sendEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), 
resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), 
flowFile.getSize());
+        }
+
+        final ProvenanceEventRecord sendEvent = sendEventBuilder.build();
+        provenanceEventRepository.registerEvent(sendEvent);
+        return stream;
     }
 
     private String getReplayFailureReason(final ProvenanceEventRecord event) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index cf0d185..9935ba4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -35,11 +35,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.nifi.attribute.expression.language.PreparedQuery;
+import org.apache.nifi.attribute.expression.language.Query;
+import 
org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.queue.DropFlowFileState;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileSummary;
 import org.apache.nifi.controller.queue.ListFlowFileRequest;
+import org.apache.nifi.controller.queue.ListFlowFileState;
 import org.apache.nifi.controller.queue.ListFlowFileStatus;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.queue.SortColumn;
@@ -53,8 +58,10 @@ import 
org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
@@ -100,7 +107,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     private final int swapThreshold;
     private final FlowFileSwapManager swapManager;
     private final List<String> swapLocations = new ArrayList<>();
-    @SuppressWarnings("unused")
     private final TimedLock readLock;
     private final TimedLock writeLock;
     private final String identifier;
@@ -841,16 +847,194 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
 
 
     @Override
-    public ListFlowFileStatus listFlowFiles(final String requestIdentifier) {
-        return listFlowFiles(requestIdentifier, SortColumn.QUEUE_POSITION, 
SortDirection.ASCENDING);
+    public ListFlowFileStatus listFlowFiles(final String requestIdentifier, 
final int maxResults) {
+        return listFlowFiles(requestIdentifier, maxResults, 
SortColumn.QUEUE_POSITION, SortDirection.ASCENDING);
     }
 
     @Override
-    public ListFlowFileStatus listFlowFiles(final String requestIdentifier, 
final SortColumn sortColumn, final SortDirection direction) {
-        // TODO: Implement
-        return null;
+    public ListFlowFileStatus listFlowFiles(final String requestIdentifier, 
final int maxResults, final SortColumn sortColumn, final SortDirection 
direction) {
+        return listFlowFiles(requestIdentifier, maxResults, null, sortColumn, 
direction);
+    }
+
+    @Override
+    public ListFlowFileStatus listFlowFiles(final String requestIdentifier, 
final int maxResults, final String query, final SortColumn sortColumn, final 
SortDirection direction) {
+        final PreparedQuery preparedQuery;
+        if (query == null) {
+            preparedQuery = null;
+        } else {
+            try {
+                final ResultType resultType = 
Query.compile(query).getResultType();
+                if (resultType != ResultType.BOOLEAN) {
+                    throw new IllegalArgumentException("Invalid expression 
Language provided to search the listing of FlowFiles. "
+                        + "The expression must return a 'Boolean' type but 
returns a " + resultType.name() + " type");
+                }
+                preparedQuery = Query.prepare(query);
+            } catch (final AttributeExpressionLanguageParsingException e) {
+                throw new IllegalArgumentException("Invalid Expression 
Language provided to search the listing of FlowFiles: " + query, e);
+            }
+        }
+
+        // purge any old requests from the map just to keep it clean. But if 
there are very requests, which is usually the case, then don't bother
+        if (listRequestMap.size() > 10) {
+            final List<String> toDrop = new ArrayList<>();
+            for (final Map.Entry<String, ListFlowFileRequest> entry : 
listRequestMap.entrySet()) {
+                final ListFlowFileRequest request = entry.getValue();
+                final boolean completed = request.getState() == 
ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE;
+
+                if (completed && System.currentTimeMillis() - 
request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+                    toDrop.add(entry.getKey());
+                }
+            }
+
+            for (final String requestId : toDrop) {
+                listRequestMap.remove(requestId);
+            }
+        }
+
+        final int numSteps;
+        readLock.lock();
+        try {
+            numSteps = 1 + swapLocations.size();
+        } finally {
+            readLock.unlock("listFlowFiles");
+        }
+
+        final ListFlowFileRequest listRequest = new 
ListFlowFileRequest(requestIdentifier, sortColumn, direction, size(), numSteps);
+
+        final Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                int position = 0;
+                int resultCount = 0;
+                final List<FlowFileSummary> summaries = new 
ArrayList<>(activeQueue.size());
+
+                // we need a write lock while using the Active Queue because 
we can't iterate over it - we have to poll from it
+                // continually. This is because the iterator for PriorityQueue 
does not iterate over the elements in any particular
+                // order. Since we need the 'position' of the element in the 
queue, we need to iterate over them in the proper order.
+                writeLock.lock();
+                try {
+                    final List<FlowFileRecord> flowFileRecords = new 
ArrayList<>(activeQueue.size());
+
+                    FlowFileRecord flowFile;
+                    try {
+                        while ((flowFile = activeQueue.poll()) != null) {
+                            flowFileRecords.add(flowFile);
+                            position++;
+
+                            if (preparedQuery == null || 
"true".equals(preparedQuery.evaluateExpressions(flowFile))) {
+                                summaries.add(summarize(flowFile, position));
+                                if (++resultCount >= maxResults) {
+                                    break;
+                                }
+                            }
+                        }
+                    } finally {
+                        activeQueue.addAll(flowFileRecords);
+                    }
+                } finally {
+                    writeLock.unlock("List FlowFiles");
+                }
+
+                position = activeQueue.size();
+                sourceLoop: while (resultCount < maxResults) {
+                    try {
+                        // We are now iterating over swap files, and we don't 
need the write lock for this, just the read lock, since
+                        // we are not modifying anything.
+                        readLock.lock();
+                        try {
+                            for (final String location : swapLocations) {
+                                final List<FlowFileRecord> flowFiles = 
swapManager.peek(location, StandardFlowFileQueue.this);
+                                for (final FlowFileRecord flowFile : 
flowFiles) {
+                                    position++;
+
+                                    if (preparedQuery == null || 
"true".equals(preparedQuery.evaluateExpressions(flowFile))) {
+                                        summaries.add(summarize(flowFile, 
position));
+                                        if (++resultCount >= maxResults) {
+                                            break sourceLoop;
+                                        }
+                                    }
+                                }
+                            }
+
+                            for (final FlowFileRecord flowFile : swapQueue) {
+                                position++;
+
+                                if (preparedQuery == null || 
"true".equals(preparedQuery.evaluateExpressions(flowFile))) {
+                                    summaries.add(summarize(flowFile, 
position));
+                                    if (++resultCount >= maxResults) {
+                                        break sourceLoop;
+                                    }
+                                }
+                            }
+                        } finally {
+                            readLock.unlock("List FlowFiles");
+                        }
+                    } catch (final IOException ioe) {
+                        logger.error("Failed to read swapped FlowFiles in 
order to perform listing of queue " + StandardFlowFileQueue.this, ioe);
+                        listRequest.setFailure("Could not read FlowFiles from 
queue. Check log files for more details.");
+                    }
+                }
+            }
+        }, "List FlowFiles for Connection " + getIdentifier());
+        t.setDaemon(true);
+        t.start();
+
+        listRequestMap.put(requestIdentifier, listRequest);
+        return listRequest;
     }
 
+    private FlowFileSummary summarize(final FlowFile flowFile, final int 
position) {
+        // extract all of the information that we care about into new 
variables rather than just
+        // wrapping the FlowFile object with a FlowFileSummary object. We do 
this because we want to
+        // be able to hold many FlowFileSummary objects in memory and if we 
just wrap the FlowFile object,
+        // we will end up holding the entire FlowFile (including all 
Attributes) in the Java heap as well,
+        // which can be problematic if we expect them to be swapped out.
+        final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+        final String filename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+        final long size = flowFile.getSize();
+        final long lastQueuedTime = flowFile.getLastQueueDate();
+        final long lineageStart = flowFile.getLineageStartDate();
+        final boolean penalized = flowFile.isPenalized();
+
+        return new FlowFileSummary() {
+            @Override
+            public String getUuid() {
+                return uuid;
+            }
+
+            @Override
+            public String getFilename() {
+                return filename;
+            }
+
+            @Override
+            public int getPosition() {
+                return position;
+            }
+
+            @Override
+            public long getSize() {
+                return size;
+            }
+
+            @Override
+            public long lastQueuedTime() {
+                return lastQueuedTime;
+            }
+
+            @Override
+            public long getLineageStartDate() {
+                return lineageStart;
+            }
+
+            @Override
+            public boolean isPenalized() {
+                return penalized;
+            }
+        };
+    }
+
+
     @Override
     public ListFlowFileStatus getListFlowFileStatus(final String 
requestIdentifier) {
         return listRequestMap.get(requestIdentifier);
@@ -858,6 +1042,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
 
     @Override
     public ListFlowFileStatus cancelListFlowFileRequest(final String 
requestIdentifier) {
+        logger.info("Canceling ListFlowFile Request with ID {}", 
requestIdentifier);
         final ListFlowFileRequest request = 
listRequestMap.remove(requestIdentifier);
         if (request != null) {
             request.cancel();
@@ -867,8 +1052,41 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     }
 
     @Override
-    public FlowFileRecord getFlowFile(String flowFileUuid) {
-        // TODO: Implement
+    public FlowFileRecord getFlowFile(final String flowFileUuid) throws 
IOException {
+        if (flowFileUuid == null) {
+            return null;
+        }
+
+        readLock.lock();
+        try {
+            // read through all of the FlowFiles in the queue, looking for the 
FlowFile with the given ID
+            for (final FlowFileRecord flowFile : activeQueue) {
+                if 
(flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
+                    return flowFile;
+                }
+            }
+
+            for (final FlowFileRecord flowFile : swapQueue) {
+                if 
(flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
+                    return flowFile;
+                }
+            }
+
+            // TODO: consider using a Long flowFileId instead of a UUID, and 
then having the swap manager
+            // write out the min and max FlowFile ID's. This would allow us to 
then have a method: boolean isFlowFilePossiblyContained(long id)
+            // which can return a boolean value that can be used to determine 
whether or not to even call peek
+            for (final String swapLocation : swapLocations) {
+                final List<FlowFileRecord> flowFiles = 
swapManager.peek(swapLocation, this);
+                for (final FlowFileRecord flowFile : flowFiles) {
+                    if 
(flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
+                        return flowFile;
+                    }
+                }
+            }
+        } finally {
+            readLock.unlock("getFlowFile");
+        }
+
         return null;
     }
 

Reply via email to