Repository: nifi Updated Branches: refs/heads/NIFI-108 670733753 -> b12aba782
NIFI-108: Implementing ability to list FlowFiles in a queue Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/245d8d4d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/245d8d4d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/245d8d4d Branch: refs/heads/NIFI-108 Commit: 245d8d4d2d3fbd6182e7e587d3433cc7378cc37a Parents: 2e22954 Author: Mark Payne <[email protected]> Authored: Thu Dec 17 13:00:30 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Thu Dec 17 13:00:30 2015 -0500 ---------------------------------------------------------------------- .../nifi/controller/queue/FlowFileQueue.java | 33 ++- .../controller/queue/ListFlowFileStatus.java | 5 + .../cluster/manager/impl/WebClusterManager.java | 8 + .../controller/queue/ListFlowFileRequest.java | 79 +++++-- .../apache/nifi/controller/FlowController.java | 39 +++- .../nifi/controller/StandardFlowFileQueue.java | 234 ++++++++++++++++++- 6 files changed, 363 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index fe8649d..dbf2f04 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller.queue; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Queue; @@ -217,12 +218,32 @@ public interface FlowFileQueue { * will be returned ordered by the position of the FlowFile in the queue. * * @param requestIdentifier the identifier of the List FlowFile Request + * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus + * + * @return the status for the request + * + * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs + * is currently running. + */ + ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults); + + /** + * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a + * ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist + * within the queue. Additionally, the ListFlowFileStatus provides a request identifier that + * can then be passed to the {@link #getListFlowFileStatus(String)} + * + * @param requestIdentifier the identifier of the List FlowFile Request + * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus + * @param sortColumn specifies which column to sort on + * @param direction specifies which direction to sort the FlowFiles + * * @return the status for the request * * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs * is currently running. */ - ListFlowFileStatus listFlowFiles(String requestIdentifier); + ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, SortColumn sortColumn, SortDirection direction); /** * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a @@ -231,6 +252,9 @@ public interface FlowFileQueue { * can then be passed to the {@link #getListFlowFileStatus(String)} * * @param requestIdentifier the identifier of the List FlowFile Request + * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus + * @param query an Expression Language expression that will be evaluated against all FlowFiles. Only FlowFiles that satisfy the expression will + * be included in the results. The expression must be a valid expression and return a Boolean type * @param sortColumn specifies which column to sort on * @param direction specifies which direction to sort the FlowFiles * @@ -238,8 +262,9 @@ public interface FlowFileQueue { * * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs * is currently running. + * @throws IllegalArgumentException if query is not a valid Expression Language expression or does not return a boolean type */ - ListFlowFileStatus listFlowFiles(String requestIdentifier, SortColumn sortColumn, SortDirection direction); + ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, String query, SortColumn sortColumn, SortDirection direction); /** * Returns the current status of a List FlowFile Request that was initiated via the {@link #listFlowFiles(String)} @@ -269,6 +294,8 @@ public interface FlowFileQueue { * @param flowFileUuid the UUID of the FlowFile to retrieve * @return the FlowFile with the given UUID or <code>null</code> if no FlowFile can be found in this queue * with the given UUID + * + * @throws IOException if unable to read FlowFiles that are stored on some external device */ - FlowFileRecord getFlowFile(String flowFileUuid); + FlowFileRecord getFlowFile(String flowFileUuid) throws IOException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java index 2959170..5781283 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java @@ -67,4 +67,9 @@ public interface ListFlowFileStatus { * @return a List of FlowFileSummary objects */ List<FlowFileSummary> getFlowFileSummaries(); + + /** + * @return the percentage (an integer between 0 and 100, inclusive) of how close the request is to being completed + */ + int getCompletionPercentage(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 5e9dd3c..035a888 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -321,6 +321,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents"); public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}"); + public static final Pattern LIST_FLOWFILES_URI = Pattern + .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/listing-requests/[a-f0-9\\-]{36}"); private final NiFiProperties properties; private final HttpRequestReplicator httpRequestReplicator; @@ -2431,6 +2433,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches(); } + private static boolean isListFlowFilesEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && LIST_FLOWFILES_URI.matcher(uri.getPath()).matches(); + } + private static boolean isCountersEndpoint(final URI uri) { return COUNTERS_URI.matcher(uri.getPath()).matches(); } @@ -3309,6 +3315,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C mergeDropRequests(dropRequest, resultsMap); clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isListFlowFilesEndpoint(uri, method)) { + // TODO: IMPLEMENT } else { if (!nodeResponsesToDrain.isEmpty()) { drainResponses(nodeResponsesToDrain); http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java index 03e0188..e83fd80 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java @@ -17,63 +17,93 @@ package org.apache.nifi.controller.queue; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; public class ListFlowFileRequest implements ListFlowFileStatus { + private final String requestId; + private final QueueSize queueSize; + private final SortColumn sortColumn; + private final SortDirection sortDirection; + private final long submissionTime = System.currentTimeMillis(); + private final List<FlowFileSummary> flowFileSummaries = new ArrayList<>(); + private ListFlowFileState state; + private String failureReason; + private int numSteps; + private int numCompletedSteps; + private long lastUpdated = System.currentTimeMillis(); + + public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final QueueSize queueSize, final int numSteps) { + this.requestId = requestId; + this.sortColumn = sortColumn; + this.sortDirection = sortDirection; + this.queueSize = queueSize; + this.numSteps = numSteps; + } @Override public String getRequestIdentifier() { - // TODO Auto-generated method stub - return null; + return requestId; } @Override public long getRequestSubmissionTime() { - // TODO Auto-generated method stub - return 0; + return submissionTime; } @Override - public long getLastUpdated() { - // TODO Auto-generated method stub - return 0; + public synchronized long getLastUpdated() { + return lastUpdated; } @Override public SortColumn getSortColumn() { - // TODO Auto-generated method stub - return null; + return sortColumn; } @Override public SortDirection getSortDirection() { - // TODO Auto-generated method stub - return null; + return sortDirection; } @Override - public ListFlowFileState getState() { - // TODO Auto-generated method stub - return null; + public synchronized ListFlowFileState getState() { + return state; } @Override - public String getFailureReason() { - // TODO Auto-generated method stub - return null; + public synchronized String getFailureReason() { + return failureReason; + } + + public synchronized void setState(final ListFlowFileState state) { + this.state = state; + this.lastUpdated = System.currentTimeMillis(); + } + + public synchronized void setFailure(final String explanation) { + this.state = ListFlowFileState.FAILURE; + this.failureReason = explanation; + this.lastUpdated = System.currentTimeMillis(); } @Override - public List<FlowFileSummary> getFlowFileSummaries() { - // TODO Auto-generated method stub - return null; + public synchronized List<FlowFileSummary> getFlowFileSummaries() { + return Collections.unmodifiableList(flowFileSummaries); + } + + public synchronized void addFlowFileSummaries(final List<FlowFileSummary> summaries) { + // TODO: Implement. + + lastUpdated = System.currentTimeMillis(); + this.numCompletedSteps++; } @Override public QueueSize getQueueSize() { - // TODO Auto-generated method stub - return null; + return queueSize; } public synchronized boolean cancel() { @@ -84,4 +114,9 @@ public class ListFlowFileRequest implements ListFlowFileStatus { this.state = ListFlowFileState.CANCELED; return true; } + + @Override + public synchronized int getCompletionPercentage() { + return (int) (100F * numCompletedSteps / numSteps); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 2ad102f..dd3b687 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -177,6 +177,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; + +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -3242,8 +3244,41 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } public InputStream getContent(final FlowFileRecord flowFile, final String requestor, final String requestUri) throws IOException { - // TODO: IMPLEMENT - return null; + requireNonNull(flowFile); + requireNonNull(requestor); + requireNonNull(requestUri); + + final InputStream stream; + final ResourceClaim resourceClaim; + final ContentClaim contentClaim = flowFile.getContentClaim(); + if (contentClaim == null) { + resourceClaim = null; + stream = new ByteArrayInputStream(new byte[0]); + } else { + resourceClaim = flowFile.getContentClaim().getResourceClaim(); + stream = contentRepository.read(flowFile.getContentClaim()); + } + + // Register a Provenance Event to indicate that we replayed the data. + final StandardProvenanceEventRecord.Builder sendEventBuilder = new StandardProvenanceEventRecord.Builder() + .setEventType(ProvenanceEventType.DOWNLOAD) + .setFlowFileUUID(flowFile.getAttribute(CoreAttributes.UUID.key())) + .setAttributes(flowFile.getAttributes(), Collections.<String, String> emptyMap()) + .setTransitUri(requestUri) + .setEventTime(System.currentTimeMillis()) + .setFlowFileEntryDate(flowFile.getEntryDate()) + .setLineageStartDate(flowFile.getLineageStartDate()) + .setComponentType(getName()) + .setComponentId(getRootGroupId()) + .setDetails("Download of Content requested by " + requestor + " for " + flowFile); + + if (contentClaim != null) { + sendEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize()); + } + + final ProvenanceEventRecord sendEvent = sendEventBuilder.build(); + provenanceEventRepository.registerEvent(sendEvent); + return stream; } private String getReplayFailureReason(final ProvenanceEventRecord event) { http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index cf0d185..9935ba4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -35,11 +35,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.nifi.attribute.expression.language.PreparedQuery; +import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.queue.DropFlowFileState; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.FlowFileSummary; import org.apache.nifi.controller.queue.ListFlowFileRequest; +import org.apache.nifi.controller.queue.ListFlowFileState; import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.queue.SortColumn; @@ -53,8 +58,10 @@ import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; @@ -100,7 +107,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final int swapThreshold; private final FlowFileSwapManager swapManager; private final List<String> swapLocations = new ArrayList<>(); - @SuppressWarnings("unused") private final TimedLock readLock; private final TimedLock writeLock; private final String identifier; @@ -841,16 +847,194 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override - public ListFlowFileStatus listFlowFiles(final String requestIdentifier) { - return listFlowFiles(requestIdentifier, SortColumn.QUEUE_POSITION, SortDirection.ASCENDING); + public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults) { + return listFlowFiles(requestIdentifier, maxResults, SortColumn.QUEUE_POSITION, SortDirection.ASCENDING); } @Override - public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final SortColumn sortColumn, final SortDirection direction) { - // TODO: Implement - return null; + public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults, final SortColumn sortColumn, final SortDirection direction) { + return listFlowFiles(requestIdentifier, maxResults, null, sortColumn, direction); + } + + @Override + public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults, final String query, final SortColumn sortColumn, final SortDirection direction) { + final PreparedQuery preparedQuery; + if (query == null) { + preparedQuery = null; + } else { + try { + final ResultType resultType = Query.compile(query).getResultType(); + if (resultType != ResultType.BOOLEAN) { + throw new IllegalArgumentException("Invalid expression Language provided to search the listing of FlowFiles. " + + "The expression must return a 'Boolean' type but returns a " + resultType.name() + " type"); + } + preparedQuery = Query.prepare(query); + } catch (final AttributeExpressionLanguageParsingException e) { + throw new IllegalArgumentException("Invalid Expression Language provided to search the listing of FlowFiles: " + query, e); + } + } + + // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother + if (listRequestMap.size() > 10) { + final List<String> toDrop = new ArrayList<>(); + for (final Map.Entry<String, ListFlowFileRequest> entry : listRequestMap.entrySet()) { + final ListFlowFileRequest request = entry.getValue(); + final boolean completed = request.getState() == ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE; + + if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) { + toDrop.add(entry.getKey()); + } + } + + for (final String requestId : toDrop) { + listRequestMap.remove(requestId); + } + } + + final int numSteps; + readLock.lock(); + try { + numSteps = 1 + swapLocations.size(); + } finally { + readLock.unlock("listFlowFiles"); + } + + final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, size(), numSteps); + + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + int position = 0; + int resultCount = 0; + final List<FlowFileSummary> summaries = new ArrayList<>(activeQueue.size()); + + // we need a write lock while using the Active Queue because we can't iterate over it - we have to poll from it + // continually. This is because the iterator for PriorityQueue does not iterate over the elements in any particular + // order. Since we need the 'position' of the element in the queue, we need to iterate over them in the proper order. + writeLock.lock(); + try { + final List<FlowFileRecord> flowFileRecords = new ArrayList<>(activeQueue.size()); + + FlowFileRecord flowFile; + try { + while ((flowFile = activeQueue.poll()) != null) { + flowFileRecords.add(flowFile); + position++; + + if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) { + summaries.add(summarize(flowFile, position)); + if (++resultCount >= maxResults) { + break; + } + } + } + } finally { + activeQueue.addAll(flowFileRecords); + } + } finally { + writeLock.unlock("List FlowFiles"); + } + + position = activeQueue.size(); + sourceLoop: while (resultCount < maxResults) { + try { + // We are now iterating over swap files, and we don't need the write lock for this, just the read lock, since + // we are not modifying anything. + readLock.lock(); + try { + for (final String location : swapLocations) { + final List<FlowFileRecord> flowFiles = swapManager.peek(location, StandardFlowFileQueue.this); + for (final FlowFileRecord flowFile : flowFiles) { + position++; + + if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) { + summaries.add(summarize(flowFile, position)); + if (++resultCount >= maxResults) { + break sourceLoop; + } + } + } + } + + for (final FlowFileRecord flowFile : swapQueue) { + position++; + + if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) { + summaries.add(summarize(flowFile, position)); + if (++resultCount >= maxResults) { + break sourceLoop; + } + } + } + } finally { + readLock.unlock("List FlowFiles"); + } + } catch (final IOException ioe) { + logger.error("Failed to read swapped FlowFiles in order to perform listing of queue " + StandardFlowFileQueue.this, ioe); + listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details."); + } + } + } + }, "List FlowFiles for Connection " + getIdentifier()); + t.setDaemon(true); + t.start(); + + listRequestMap.put(requestIdentifier, listRequest); + return listRequest; } + private FlowFileSummary summarize(final FlowFile flowFile, final int position) { + // extract all of the information that we care about into new variables rather than just + // wrapping the FlowFile object with a FlowFileSummary object. We do this because we want to + // be able to hold many FlowFileSummary objects in memory and if we just wrap the FlowFile object, + // we will end up holding the entire FlowFile (including all Attributes) in the Java heap as well, + // which can be problematic if we expect them to be swapped out. + final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key()); + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final long size = flowFile.getSize(); + final long lastQueuedTime = flowFile.getLastQueueDate(); + final long lineageStart = flowFile.getLineageStartDate(); + final boolean penalized = flowFile.isPenalized(); + + return new FlowFileSummary() { + @Override + public String getUuid() { + return uuid; + } + + @Override + public String getFilename() { + return filename; + } + + @Override + public int getPosition() { + return position; + } + + @Override + public long getSize() { + return size; + } + + @Override + public long lastQueuedTime() { + return lastQueuedTime; + } + + @Override + public long getLineageStartDate() { + return lineageStart; + } + + @Override + public boolean isPenalized() { + return penalized; + } + }; + } + + @Override public ListFlowFileStatus getListFlowFileStatus(final String requestIdentifier) { return listRequestMap.get(requestIdentifier); @@ -858,6 +1042,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public ListFlowFileStatus cancelListFlowFileRequest(final String requestIdentifier) { + logger.info("Canceling ListFlowFile Request with ID {}", requestIdentifier); final ListFlowFileRequest request = listRequestMap.remove(requestIdentifier); if (request != null) { request.cancel(); @@ -867,8 +1052,41 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } @Override - public FlowFileRecord getFlowFile(String flowFileUuid) { - // TODO: Implement + public FlowFileRecord getFlowFile(final String flowFileUuid) throws IOException { + if (flowFileUuid == null) { + return null; + } + + readLock.lock(); + try { + // read through all of the FlowFiles in the queue, looking for the FlowFile with the given ID + for (final FlowFileRecord flowFile : activeQueue) { + if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) { + return flowFile; + } + } + + for (final FlowFileRecord flowFile : swapQueue) { + if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) { + return flowFile; + } + } + + // TODO: consider using a Long flowFileId instead of a UUID, and then having the swap manager + // write out the min and max FlowFile ID's. This would allow us to then have a method: boolean isFlowFilePossiblyContained(long id) + // which can return a boolean value that can be used to determine whether or not to even call peek + for (final String swapLocation : swapLocations) { + final List<FlowFileRecord> flowFiles = swapManager.peek(swapLocation, this); + for (final FlowFileRecord flowFile : flowFiles) { + if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) { + return flowFile; + } + } + } + } finally { + readLock.unlock("getFlowFile"); + } + return null; }
