http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index c6fbf28..64e49c3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -16,30 +16,20 @@ */ package org.apache.nifi.controller; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; - +import org.apache.nifi.attribute.expression.language.PreparedQuery; +import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.queue.DropFlowFileState; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.FlowFileSummary; +import org.apache.nifi.controller.queue.ListFlowFileRequest; +import org.apache.nifi.controller.queue.ListFlowFileState; +import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.queue.SortColumn; +import org.apache.nifi.controller.queue.SortDirection; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; @@ -49,8 +39,10 @@ import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; @@ -65,6 +57,27 @@ import org.apache.nifi.util.concurrency.TimedLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; + /** * A FlowFileQueue is used to queue FlowFile objects that are awaiting further * processing. Must be thread safe. @@ -82,7 +95,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { // guarded by lock private ArrayList<FlowFileRecord> swapQueue = null; - private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L)); + private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0, 0L)); private boolean swapMode = false; @@ -96,7 +109,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final int swapThreshold; private final FlowFileSwapManager swapManager; private final List<String> swapLocations = new ArrayList<>(); - @SuppressWarnings("unused") private final TimedLock readLock; private final TimedLock writeLock; private final String identifier; @@ -104,6 +116,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final ProvenanceEventRepository provRepository; private final ResourceClaimManager resourceClaimManager; + private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>(); + // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK! private final ProcessScheduler scheduler; @@ -263,7 +278,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { try { if (swapMode || activeQueue.size() >= swapThreshold) { swapQueue.add(file); - incrementSwapQueueSize(1, file.getSize()); + incrementSwapQueueSize(1, file.getSize(), 0); swapMode = true; writeSwapFilesIfNecessary(); } else { @@ -291,7 +306,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { try { if (swapMode || activeQueue.size() >= swapThreshold - numFiles) { swapQueue.addAll(files); - incrementSwapQueueSize(numFiles, bytes); + incrementSwapQueueSize(numFiles, bytes, 0); swapMode = true; writeSwapFilesIfNecessary(); } else { @@ -450,7 +465,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { for (final FlowFileRecord flowFile : swappedIn) { swapSize += flowFile.getSize(); } - incrementSwapQueueSize(-swappedIn.size(), -swapSize); + incrementSwapQueueSize(-swappedIn.size(), -swapSize, -1); incrementActiveQueueSize(swappedIn.size(), swapSize); activeQueue.addAll(swappedIn); return; @@ -496,7 +511,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { if (recordsMigrated > 0) { incrementActiveQueueSize(recordsMigrated, bytesMigrated); - incrementSwapQueueSize(-recordsMigrated, -bytesMigrated); + incrementSwapQueueSize(-recordsMigrated, -bytesMigrated, 0); } if (size.get().swappedCount == 0) { @@ -587,7 +602,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final long addedSwapBytes = updatedSwapQueueBytes - originalSwapQueueBytes; final FlowFileQueueSize newSize = new FlowFileQueueSize(activeQueue.size(), activeQueueBytes, - originalSize.swappedCount + addedSwapRecords + flowFilesSwappedOut, originalSize.swappedBytes + addedSwapBytes + bytesSwappedOut, + originalSize.swappedCount + addedSwapRecords + flowFilesSwappedOut, + originalSize.swappedBytes + addedSwapBytes + bytesSwappedOut, + originalSize.swapFiles + numSwapFiles, originalSize.unacknowledgedCount, originalSize.unacknowledgedBytes); updated = size.compareAndSet(originalSize, newSize); } @@ -817,7 +834,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } } - incrementSwapQueueSize(swapFlowFileCount, swapByteCount); + incrementSwapQueueSize(swapFlowFileCount, swapByteCount, swapLocations.size()); this.swapLocations.addAll(swapLocations); } finally { writeLock.unlock("Recover Swap Files"); @@ -832,7 +849,275 @@ public final class StandardFlowFileQueue implements FlowFileQueue { return "FlowFileQueue[id=" + identifier + "]"; } - private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>(); + + @Override + public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults) { + return listFlowFiles(requestIdentifier, maxResults, SortColumn.QUEUE_POSITION, SortDirection.ASCENDING); + } + + @Override + public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults, final SortColumn sortColumn, final SortDirection direction) { + return listFlowFiles(requestIdentifier, maxResults, null, sortColumn, direction); + } + + @Override + public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults, final String query, final SortColumn sortColumn, final SortDirection direction) { + final PreparedQuery preparedQuery; + if (query == null) { + preparedQuery = null; + } else { + try { + final ResultType resultType = Query.compile(query).getResultType(); + if (resultType != ResultType.BOOLEAN) { + throw new IllegalArgumentException("Invalid expression Language provided to search the listing of FlowFiles. " + + "The expression must return a 'Boolean' type but returns a " + resultType.name() + " type"); + } + preparedQuery = Query.prepare(query); + } catch (final AttributeExpressionLanguageParsingException e) { + throw new IllegalArgumentException("Invalid Expression Language provided to search the listing of FlowFiles: " + query, e); + } + } + + // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother + if (listRequestMap.size() > 10) { + final List<String> toDrop = new ArrayList<>(); + for (final Map.Entry<String, ListFlowFileRequest> entry : listRequestMap.entrySet()) { + final ListFlowFileRequest request = entry.getValue(); + final boolean completed = request.getState() == ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE; + + if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) { + toDrop.add(entry.getKey()); + } + } + + for (final String requestId : toDrop) { + listRequestMap.remove(requestId); + } + } + + // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue. + final int numSteps = 2 + size.get().swapFiles; + final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, maxResults, size(), numSteps); + + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + int position = 0; + int resultCount = 0; + final Comparator<FlowFileSummary> comparator = FlowFileSummaries.createComparator(sortColumn, direction); + final NavigableSet<FlowFileSummary> summaries = new TreeSet<>(comparator); + int completedStepCount = 0; + + // we need a write lock while using the Active Queue because we can't iterate over it - we have to poll from it + // continually. This is because the iterator for PriorityQueue does not iterate over the elements in any particular + // order. Since we need the 'position' of the element in the queue, we need to iterate over them in the proper order. + writeLock.lock(); + try { + logger.debug("{} Acquired lock to perform listing of FlowFiles", StandardFlowFileQueue.this); + listRequest.setState(ListFlowFileState.CALCULATING_LIST); + final List<FlowFileRecord> flowFileRecords = new ArrayList<>(activeQueue.size()); + + FlowFileRecord flowFile; + try { + while ((flowFile = activeQueue.poll()) != null) { + flowFileRecords.add(flowFile); + position++; + + if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) { + summaries.add(summarize(flowFile, position)); + if (summaries.size() > maxResults) { + summaries.pollLast(); + } + } + } + } finally { + activeQueue.addAll(flowFileRecords); + } + } finally { + writeLock.unlock("List FlowFiles"); + } + + logger.debug("{} Finished listing FlowFiles for active queue with a total of {} results", StandardFlowFileQueue.this, resultCount); + + listRequest.setCompletedStepCount(++completedStepCount); + + position = activeQueue.size(); + try { + // We are now iterating over swap files, and we don't need the write lock for this, just the read lock, since + // we are not modifying anything. + readLock.lock(); + try { + for (final String location : swapLocations) { + logger.debug("{} Performing listing of FlowFiles for Swap Location {}", StandardFlowFileQueue.this, location); + final List<FlowFileRecord> flowFiles = swapManager.peek(location, StandardFlowFileQueue.this); + for (final FlowFileRecord flowFile : flowFiles) { + position++; + + if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) { + summaries.add(summarize(flowFile, position)); + if (summaries.size() > maxResults) { + summaries.pollLast(); + } + } + } + + listRequest.setCompletedStepCount(++completedStepCount); + } + + logger.debug("{} Performing listing of FlowFiles from Swap Queue", StandardFlowFileQueue.this); + for (final FlowFileRecord flowFile : swapQueue) { + position++; + + if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) { + summaries.add(summarize(flowFile, position)); + if (summaries.size() > maxResults) { + summaries.pollLast(); + } + } + } + + listRequest.setCompletedStepCount(++completedStepCount); + } finally { + readLock.unlock("List FlowFiles"); + } + } catch (final IOException ioe) { + logger.error("Failed to read swapped FlowFiles in order to perform listing of queue " + StandardFlowFileQueue.this, ioe); + listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details."); + } + + // We have now completed the listing successfully. Set the number of completed steps to the total number of steps. We may have + // skipped some steps because we have reached the maximum number of results, so we consider those steps completed. + logger.debug("{} Completed listing of FlowFiles", StandardFlowFileQueue.this); + listRequest.setCompletedStepCount(listRequest.getTotalStepCount()); + listRequest.setState(ListFlowFileState.COMPLETE); + listRequest.setFlowFileSummaries(new ArrayList<FlowFileSummary>(summaries)); + } + }, "List FlowFiles for Connection " + getIdentifier()); + t.setDaemon(true); + t.start(); + + listRequestMap.put(requestIdentifier, listRequest); + return listRequest; + } + + private FlowFileSummary summarize(final FlowFile flowFile, final int position) { + // extract all of the information that we care about into new variables rather than just + // wrapping the FlowFile object with a FlowFileSummary object. We do this because we want to + // be able to hold many FlowFileSummary objects in memory and if we just wrap the FlowFile object, + // we will end up holding the entire FlowFile (including all Attributes) in the Java heap as well, + // which can be problematic if we expect them to be swapped out. + final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key()); + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final long size = flowFile.getSize(); + final Long lastQueuedTime = flowFile.getLastQueueDate(); + final long lineageStart = flowFile.getLineageStartDate(); + final boolean penalized = flowFile.isPenalized(); + + return new FlowFileSummary() { + @Override + public String getUuid() { + return uuid; + } + + @Override + public String getFilename() { + return filename; + } + + @Override + public int getPosition() { + return position; + } + + @Override + public long getSize() { + return size; + } + + @Override + public long getLastQueuedTime() { + return lastQueuedTime == null ? 0L : lastQueuedTime; + } + + @Override + public long getLineageStartDate() { + return lineageStart; + } + + @Override + public boolean isPenalized() { + return penalized; + } + }; + } + + + @Override + public ListFlowFileStatus getListFlowFileStatus(final String requestIdentifier) { + return listRequestMap.get(requestIdentifier); + } + + @Override + public ListFlowFileStatus cancelListFlowFileRequest(final String requestIdentifier) { + logger.info("Canceling ListFlowFile Request with ID {}", requestIdentifier); + final ListFlowFileRequest request = listRequestMap.remove(requestIdentifier); + if (request != null) { + request.cancel(); + } + + return request; + } + + @Override + public FlowFileRecord getFlowFile(final String flowFileUuid) throws IOException { + if (flowFileUuid == null) { + return null; + } + + readLock.lock(); + try { + // read through all of the FlowFiles in the queue, looking for the FlowFile with the given ID + for (final FlowFileRecord flowFile : activeQueue) { + if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) { + return flowFile; + } + } + + for (final FlowFileRecord flowFile : swapQueue) { + if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) { + return flowFile; + } + } + + // TODO: consider using a Long flowFileId instead of a UUID, and then having the swap manager + // write out the min and max FlowFile ID's. This would allow us to then have a method: boolean isFlowFilePossiblyContained(long id) + // which can return a boolean value that can be used to determine whether or not to even call peek + for (final String swapLocation : swapLocations) { + final List<FlowFileRecord> flowFiles = swapManager.peek(swapLocation, this); + for (final FlowFileRecord flowFile : flowFiles) { + if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) { + return flowFile; + } + } + } + } finally { + readLock.unlock("getFlowFile"); + } + + return null; + } + + + @Override + public void verifyCanList() throws IllegalStateException { + if (connection.getSource().isRunning()) { + throw new IllegalStateException("Cannot list the FlowFiles of queue because the connection's source is still running"); + } + + if (connection.getDestination().isRunning()) { + throw new IllegalStateException("Cannot list the FlowFiles of queue because the connection's destination is still running"); + } + } @Override public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) { @@ -899,16 +1184,15 @@ public final class StandardFlowFileQueue implements FlowFileQueue { dropRequest.setCurrentSize(getQueueSize()); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); - try { - final QueueSize swapSize = size.get().swapQueueSize(); - - logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}", - requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount()); - if (dropRequest.getState() == DropFlowFileState.CANCELED) { - logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); - return; - } + final QueueSize swapSize = size.get().swapQueueSize(); + logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}", + requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount()); + if (dropRequest.getState() == DropFlowFileState.CANCELED) { + logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); + return; + } + try { droppedSize = drop(swapQueue, requestor); } catch (final IOException ioe) { logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); @@ -922,7 +1206,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { dropRequest.setCurrentSize(getQueueSize()); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); swapMode = false; - incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount()); + incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount(), 0); logger.debug("For DropFlowFileRequest {}, dropped {} from Swap Queue", requestIdentifier, droppedSize); final int swapFileCount = swapLocations.size(); @@ -953,7 +1237,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); - incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount()); + incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount(), -1); dropRequest.setCurrentSize(getQueueSize()); swapLocationItr.remove(); @@ -1129,7 +1413,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { while (!updated) { final FlowFileQueueSize original = size.get(); final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount + count, original.activeQueueBytes + bytes, - original.swappedCount, original.swappedBytes, original.unacknowledgedCount, original.unacknowledgedBytes); + original.swappedCount, original.swappedBytes, original.swapFiles, original.unacknowledgedCount, original.unacknowledgedBytes); updated = size.compareAndSet(original, newSize); if (updated) { @@ -1138,12 +1422,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } } - private void incrementSwapQueueSize(final int count, final long bytes) { + private void incrementSwapQueueSize(final int count, final long bytes, final int fileCount) { boolean updated = false; while (!updated) { final FlowFileQueueSize original = size.get(); final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes, - original.swappedCount + count, original.swappedBytes + bytes, original.unacknowledgedCount, original.unacknowledgedBytes); + original.swappedCount + count, original.swappedBytes + bytes, original.swapFiles + fileCount, original.unacknowledgedCount, original.unacknowledgedBytes); updated = size.compareAndSet(original, newSize); if (updated) { @@ -1157,7 +1441,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { while (!updated) { final FlowFileQueueSize original = size.get(); final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes, - original.swappedCount, original.swappedBytes, original.unacknowledgedCount + count, original.unacknowledgedBytes + bytes); + original.swappedCount, original.swappedBytes, original.swapFiles, original.unacknowledgedCount + count, original.unacknowledgedBytes + bytes); updated = size.compareAndSet(original, newSize); if (updated) { @@ -1181,15 +1465,17 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final long activeQueueBytes; private final int swappedCount; private final long swappedBytes; + private final int swapFiles; private final int unacknowledgedCount; private final long unacknowledgedBytes; - public FlowFileQueueSize(final int activeQueueCount, final long activeQueueBytes, final int swappedCount, final long swappedBytes, + public FlowFileQueueSize(final int activeQueueCount, final long activeQueueBytes, final int swappedCount, final long swappedBytes, final int swapFileCount, final int unacknowledgedCount, final long unacknowledgedBytes) { this.activeQueueCount = activeQueueCount; this.activeQueueBytes = activeQueueBytes; this.swappedCount = swappedCount; this.swappedBytes = swappedBytes; + this.swapFiles = swapFileCount; this.unacknowledgedCount = unacknowledgedCount; this.unacknowledgedBytes = unacknowledgedBytes; } @@ -1218,7 +1504,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { public String toString() { return "FlowFile Queue Size[ ActiveQueue=[" + activeQueueCount + ", " + activeQueueBytes + " Bytes], Swap Queue=[" + swappedCount + ", " + swappedBytes + - " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]"; + " Bytes], Swap Files=[" + swapFiles + "], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]"; } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 639a4c8..211baa7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -110,7 +110,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // We can then destroy the data. If we end up syncing the FlowFile Repository to the backing storage mechanism and then restart // before the data is destroyed, it's okay because the data will be unknown to the Content Repository, so it will be destroyed // on restart. - private final ConcurrentMap<Integer, BlockingQueue<ContentClaim>> claimsAwaitingDestruction = new ConcurrentHashMap<>(); + private final ConcurrentMap<Integer, BlockingQueue<ResourceClaim>> claimsAwaitingDestruction = new ConcurrentHashMap<>(); public WriteAheadFlowFileRepository() { final NiFiProperties properties = NiFiProperties.getInstance(); @@ -169,12 +169,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis updateRepository(records, alwaysSync); } - private void markDestructable(final ContentClaim contentClaim) { - if (contentClaim == null) { - return; - } - - final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + private void markDestructable(final ResourceClaim resourceClaim) { if (resourceClaim == null) { return; } @@ -213,22 +208,22 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // However, the result of this is that the FileSystem Repository may end up trying to remove the content multiple times. // This does not, however, cause problems, as ContentRepository should handle this // This does indicate that some refactoring should probably be performed, though, as this is not a very clean interface. - final Set<ContentClaim> claimsToAdd = new HashSet<>(); + final Set<ResourceClaim> claimsToAdd = new HashSet<>(); for (final RepositoryRecord record : records) { if (record.getType() == RepositoryRecordType.DELETE) { // For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) { - claimsToAdd.add(record.getCurrentClaim()); + claimsToAdd.add(record.getCurrentClaim().getResourceClaim()); } // If the original claim is different than the current claim and the original claim has a claimant count <= 0, mark it as destructable. if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && getClaimantCount(record.getOriginalClaim()) <= 0) { - claimsToAdd.add(record.getOriginalClaim()); + claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); } } else if (record.getType() == RepositoryRecordType.UPDATE) { // if we have an update, and the original is no longer needed, mark original as destructable if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && getClaimantCount(record.getOriginalClaim()) <= 0) { - claimsToAdd.add(record.getOriginalClaim()); + claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); } } } @@ -236,10 +231,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis if (!claimsToAdd.isEmpty()) { // Get / Register a Set<ContentClaim> for the given Partiton Index final Integer partitionKey = Integer.valueOf(partitionIndex); - BlockingQueue<ContentClaim> claimQueue = claimsAwaitingDestruction.get(partitionKey); + BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(partitionKey); if (claimQueue == null) { claimQueue = new LinkedBlockingQueue<>(); - final BlockingQueue<ContentClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue); + final BlockingQueue<ResourceClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue); if (existingClaimQueue != null) { claimQueue = existingClaimQueue; } @@ -252,26 +247,26 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis @Override public void onSync(final int partitionIndex) { - final BlockingQueue<ContentClaim> claimQueue = claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex)); + final BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex)); if (claimQueue == null) { return; } - final Set<ContentClaim> claimsToDestroy = new HashSet<>(); + final Set<ResourceClaim> claimsToDestroy = new HashSet<>(); claimQueue.drainTo(claimsToDestroy); - for (final ContentClaim claim : claimsToDestroy) { + for (final ResourceClaim claim : claimsToDestroy) { markDestructable(claim); } } @Override public void onGlobalSync() { - for (final BlockingQueue<ContentClaim> claimQueue : claimsAwaitingDestruction.values()) { - final Set<ContentClaim> claimsToDestroy = new HashSet<>(); + for (final BlockingQueue<ResourceClaim> claimQueue : claimsAwaitingDestruction.values()) { + final Set<ResourceClaim> claimsToDestroy = new HashSet<>(); claimQueue.drainTo(claimsToDestroy); - for (final ContentClaim claim : claimsToDestroy) { + for (final ResourceClaim claim : claimsToDestroy) { markDestructable(claim); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 09ac7f2..2a46ec6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -17,29 +17,17 @@ package org.apache.nifi.controller; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.queue.DropFlowFileState; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.FlowFileSummary; +import org.apache.nifi.controller.queue.ListFlowFileState; +import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.queue.SortDirection; +import org.apache.nifi.controller.queue.SortColumn; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; @@ -55,17 +43,40 @@ import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public class TestStandardFlowFileQueue { private TestSwapManager swapManager = null; private StandardFlowFileQueue queue = null; private List<ProvenanceEventRecord> provRecords = new ArrayList<>(); + @BeforeClass + public static void setupLogging() { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG"); + } + @Before @SuppressWarnings("unchecked") public void setup() { @@ -388,6 +399,167 @@ public class TestStandardFlowFileQueue { assertEquals(20, swapManager.swapInCalledCount); } + + @Test(timeout = 5000) + public void testListFlowFilesOnlyActiveQueue() throws InterruptedException { + for (int i = 0; i < 9999; i++) { + queue.put(new TestFlowFile()); + } + + final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 10000); + assertNotNull(status); + assertEquals(9999, status.getQueueSize().getObjectCount()); + + while (status.getState() != ListFlowFileState.COMPLETE) { + Thread.sleep(100); + } + + assertEquals(9999, status.getFlowFileSummaries().size()); + assertEquals(100, status.getCompletionPercentage()); + assertNull(status.getFailureReason()); + assertEquals(2, status.getTotalStepCount()); + assertEquals(2, status.getCompletedStepCount()); + } + + @Test(timeout = 5000) + public void testListFlowFilesActiveQueueAndSwapQueue() throws InterruptedException { + for (int i = 0; i < 11000; i++) { + queue.put(new TestFlowFile()); + } + + final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 11000); + assertNotNull(status); + assertEquals(11000, status.getQueueSize().getObjectCount()); + + while (status.getState() != ListFlowFileState.COMPLETE) { + Thread.sleep(100); + } + + assertEquals(11000, status.getFlowFileSummaries().size()); + assertEquals(100, status.getCompletionPercentage()); + assertNull(status.getFailureReason()); + assertEquals(2, status.getTotalStepCount()); + assertEquals(2, status.getCompletedStepCount()); + } + + @Test(timeout = 5000) + public void testListFlowFilesActiveQueueAndSwapFile() throws InterruptedException { + for (int i = 0; i < 20000; i++) { + queue.put(new TestFlowFile()); + } + + final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 20000); + assertNotNull(status); + assertEquals(20000, status.getQueueSize().getObjectCount()); + + while (status.getState() != ListFlowFileState.COMPLETE) { + Thread.sleep(100); + } + + assertEquals(20000, status.getFlowFileSummaries().size()); + assertEquals(100, status.getCompletionPercentage()); + assertNull(status.getFailureReason()); + assertEquals(3, status.getTotalStepCount()); + assertEquals(3, status.getCompletedStepCount()); + } + + @Test(timeout = 5000) + public void testListFlowFilesActiveQueueAndSwapFilesAndSwapQueue() throws InterruptedException { + for (int i = 0; i < 30050; i++) { + queue.put(new TestFlowFile()); + } + + final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 30050); + assertNotNull(status); + assertEquals(30050, status.getQueueSize().getObjectCount()); + + while (status.getState() != ListFlowFileState.COMPLETE) { + Thread.sleep(100); + } + + assertEquals(30050, status.getFlowFileSummaries().size()); + assertEquals(100, status.getCompletionPercentage()); + assertNull(status.getFailureReason()); + assertEquals(4, status.getTotalStepCount()); + assertEquals(4, status.getCompletedStepCount()); + } + + @Test(timeout = 5000) + public void testListFlowFilesResultsLimited() throws InterruptedException { + for (int i = 0; i < 30050; i++) { + queue.put(new TestFlowFile()); + } + + final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100); + assertNotNull(status); + assertEquals(30050, status.getQueueSize().getObjectCount()); + + while (status.getState() != ListFlowFileState.COMPLETE) { + Thread.sleep(100); + } + + assertEquals(100, status.getFlowFileSummaries().size()); + assertEquals(100, status.getCompletionPercentage()); + assertNull(status.getFailureReason()); + assertEquals(4, status.getTotalStepCount()); + assertEquals(4, status.getCompletedStepCount()); + } + + @Test + public void testListFlowFilesSortedAscending() throws InterruptedException { + for (int i = 0; i < 30050; i++) { + queue.put(new TestFlowFile(i)); + } + + final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100, SortColumn.FLOWFILE_SIZE, SortDirection.ASCENDING); + assertNotNull(status); + assertEquals(30050, status.getQueueSize().getObjectCount()); + + while (status.getState() != ListFlowFileState.COMPLETE) { + Thread.sleep(100); + } + + assertEquals(100, status.getFlowFileSummaries().size()); + assertEquals(100, status.getCompletionPercentage()); + + assertNull(status.getFailureReason()); + assertEquals(4, status.getTotalStepCount()); + assertEquals(4, status.getCompletedStepCount()); + + int counter = 0; + for (final FlowFileSummary summary : status.getFlowFileSummaries()) { + assertEquals(counter++, summary.getSize()); + } + } + + @Test + public void testListFlowFilesSortedDescending() throws InterruptedException { + for (int i = 0; i < 30050; i++) { + queue.put(new TestFlowFile(i)); + } + + final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100, SortColumn.FLOWFILE_SIZE, SortDirection.DESCENDING); + assertNotNull(status); + assertEquals(30050, status.getQueueSize().getObjectCount()); + + while (status.getState() != ListFlowFileState.COMPLETE) { + Thread.sleep(100); + } + + assertEquals(100, status.getFlowFileSummaries().size()); + assertEquals(100, status.getCompletionPercentage()); + + assertNull(status.getFailureReason()); + assertEquals(4, status.getTotalStepCount()); + assertEquals(4, status.getCompletedStepCount()); + + int counter = 0; + for (final FlowFileSummary summary : status.getFlowFileSummaries()) { + assertEquals((30050 - 1 - counter++), summary.getSize()); + } + } + + private class TestSwapManager implements FlowFileSwapManager { private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>(); int swapOutCalledCount = 0; http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 73d76bd..4bc1222 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -21,6 +21,8 @@ import java.util.Date; import java.util.Set; import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.queue.SortColumn; +import org.apache.nifi.controller.queue.SortDirection; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.web.api.dto.BulletinBoardDTO; @@ -33,9 +35,11 @@ import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.CounterDTO; import org.apache.nifi.web.api.dto.CountersDTO; import org.apache.nifi.web.api.dto.DocumentedTypeDTO; +import org.apache.nifi.web.api.dto.FlowFileDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FunnelDTO; import org.apache.nifi.web.api.dto.LabelDTO; +import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO; import org.apache.nifi.web.api.dto.PortDTO; @@ -119,6 +123,17 @@ public interface NiFiServiceFacade { DownloadableContent getContent(Long eventId, String uri, ContentDirection contentDirection); /** + * Gets the content for the specified flowfile in the specified connection. + * + * @param groupId group + * @param connectionId connection + * @param flowfileUuid flowfile + * @param uri uri + * @return content + */ + DownloadableContent getContent(String groupId, String connectionId, String flowfileUuid, String uri); + + /** * Retrieves provenance. * * @param queryId identifier @@ -483,6 +498,14 @@ public interface NiFiServiceFacade { ConfigurationSnapshot<ConnectionDTO> createConnection(Revision revision, String groupId, ConnectionDTO connectionDTO); /** + * Determines if this connection can be listed. + * + * @param groupId group + * @param connectionId connection + */ + void verifyListQueue(String groupId, String connectionId); + + /** * Determines if this connection can be created. * * @param groupId group @@ -556,6 +579,48 @@ public interface NiFiServiceFacade { */ DropRequestDTO deleteFlowFileDropRequest(String groupId, String connectionId, String dropRequestId); + /** + * Creates a new flow file listing request. + * + * @param groupId group + * @param connectionId The ID of the connection + * @param listingRequestId The ID of the listing request + * @param column sort column + * @param direction sort direction + * @return The ListingRequest + */ + ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId, SortColumn column, SortDirection direction); + + /** + * Gets a new flow file listing request. + * + * @param groupId group + * @param connectionId The ID of the connection + * @param listingRequestId The ID of the listing request + * @return The ListingRequest + */ + ListingRequestDTO getFlowFileListingRequest(String groupId, String connectionId, String listingRequestId); + + /** + * Deletes a new flow file listing request. + * + * @param groupId group + * @param connectionId The ID of the connection + * @param listingRequestId The ID of the listing request + * @return The ListingRequest + */ + ListingRequestDTO deleteFlowFileListingRequest(String groupId, String connectionId, String listingRequestId); + + /** + * Gets the specified flowfile from the specified connection. + * + * @param groupId group + * @param connectionId The ID of the connection + * @param flowFileUuid The UUID of the flowfile + * @return The FlowFileDTO + */ + FlowFileDTO getFlowFile(String groupId, String connectionId, String flowFileUuid); + // ---------------------------------------- // InputPort methods // ---------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java index 2f75004..f994c52 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java @@ -25,6 +25,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.ws.rs.HttpMethod; import javax.ws.rs.core.MultivaluedMap; import org.apache.commons.lang3.StringUtils; @@ -51,6 +53,12 @@ public class StandardNiFiContentAccess implements ContentAccess { private static final Logger logger = LoggerFactory.getLogger(StandardNiFiContentAccess.class); public static final String CLIENT_ID_PARAM = "clientId"; + private static final Pattern FLOWFILE_CONTENT_URI_PATTERN = Pattern + .compile("/controller/process-groups/((?:root)|(?:[a-f0-9\\-]{36}))/connections/([a-f0-9\\-]{36})/flowfiles/([a-f0-9\\-]{36})/content"); + + private static final Pattern PROVENANCE_CONTENT_URI_PATTERN = Pattern + .compile("/controller/provenance/events/([0-9]+)/content/((?:input)|(?:output))"); + private NiFiProperties properties; private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; @@ -115,21 +123,41 @@ public class StandardNiFiContentAccess implements ContentAccess { // create the downloadable content return new DownloadableContent(filename, contentType, clientResponse.getEntityInputStream()); } else { - // example URI: http://localhost:8080/nifi-api/controller/provenance/events/1/content/input - final String eventDetails = StringUtils.substringAfterLast(request.getDataUri(), "events/"); - final String rawEventId = StringUtils.substringBefore(eventDetails, "/content/"); - final String rawDirection = StringUtils.substringAfterLast(eventDetails, "/content/"); + // example URIs: + // http://localhost:8080/nifi-api/controller/provenance/events/{id}/content/{input|output} + // http://localhost:8080/nifi-api/controller/process-groups/{root|uuid}/connections/{uuid}/flowfiles/{uuid}/content - // get the content type - final Long eventId; - final ContentDirection direction; - try { - eventId = Long.parseLong(rawEventId); - direction = ContentDirection.valueOf(rawDirection.toUpperCase()); - } catch (final IllegalArgumentException iae) { + // get just the context path for comparison + final String dataUri = StringUtils.substringAfter(request.getDataUri(), "/nifi-api"); + if (StringUtils.isBlank(dataUri)) { throw new IllegalArgumentException("The specified data reference URI is not valid."); } - return serviceFacade.getContent(eventId, request.getDataUri(), direction); + + // flowfile listing content + final Matcher flowFileMatcher = FLOWFILE_CONTENT_URI_PATTERN.matcher(dataUri); + if (flowFileMatcher.matches()) { + final String groupId = flowFileMatcher.group(1); + final String connectionId = flowFileMatcher.group(2); + final String flowfileId = flowFileMatcher.group(3); + + return serviceFacade.getContent(groupId, connectionId, flowfileId, dataUri); + } + + // provenance event content + final Matcher provenanceMatcher = PROVENANCE_CONTENT_URI_PATTERN.matcher(dataUri); + if (provenanceMatcher.matches()) { + try { + final Long eventId = Long.parseLong(provenanceMatcher.group(1)); + final ContentDirection direction = ContentDirection.valueOf(provenanceMatcher.group(2).toUpperCase()); + + return serviceFacade.getContent(eventId, dataUri, direction); + } catch (final IllegalArgumentException iae) { + throw new IllegalArgumentException("The specified data reference URI is not valid."); + } + } + + // invalid uri + throw new IllegalArgumentException("The specified data reference URI is not valid."); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b330fd16/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index e7a3328..2f92588 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -62,6 +62,8 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Template; import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.queue.SortColumn; +import org.apache.nifi.controller.queue.SortDirection; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.PortStatus; @@ -80,6 +82,8 @@ import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.web.api.dto.FlowFileDTO; +import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.security.user.NiFiUserUtils; import org.apache.nifi.user.AccountStatus; import org.apache.nifi.user.NiFiUser; @@ -217,6 +221,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // ----------------------------------------- // Verification Operations // ----------------------------------------- + + @Override + public void verifyListQueue(String groupId, String connectionId) { + connectionDAO.verifyList(groupId, connectionId); + } + @Override public void verifyCreateConnection(String groupId, ConnectionDTO connectionDTO) { connectionDAO.verifyCreate(groupId, connectionDTO); @@ -817,6 +827,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public ListingRequestDTO deleteFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) { + return dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(groupId, connectionId, listingRequestId)); + } + + @Override public ConfigurationSnapshot<Void> deleteProcessor(final Revision revision, final String groupId, final String processorId) { return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() { @Override @@ -1069,7 +1084,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId, String dropRequestId) { - return dtoFactory.createDropRequestDTO(connectionDAO.createFileFlowDropRequest(groupId, connectionId, dropRequestId)); + return dtoFactory.createDropRequestDTO(connectionDAO.createFlowFileDropRequest(groupId, connectionId, dropRequestId)); + } + + @Override + public ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId, SortColumn column, SortDirection direction) { + return dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(groupId, connectionId, listingRequestId, column, direction)); } @Override @@ -1949,6 +1969,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public DownloadableContent getContent(String groupId, String connectionId, String flowFileUuid, String uri) { + return connectionDAO.getContent(groupId, connectionId, flowFileUuid, uri); + } + + @Override public DownloadableContent getContent(Long eventId, String uri, ContentDirection contentDirection) { return controllerFacade.getContent(eventId, uri, contentDirection); } @@ -2127,6 +2152,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public ListingRequestDTO getFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) { + return dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(groupId, connectionId, listingRequestId)); + } + + @Override + public FlowFileDTO getFlowFile(String groupId, String connectionId, String flowFileUuid) { + return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(groupId, connectionId, flowFileUuid)); + } + + @Override public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) { return controllerFacade.getConnectionStatusHistory(groupId, connectionId); }
