Repository: nifi Updated Branches: refs/heads/NIFI-108 ab30bf046 -> 6d64f58d4
NIFI-108: Added unit tests; added verifyCanList method to queue; fixed bugs Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e18038ac Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e18038ac Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e18038ac Branch: refs/heads/NIFI-108 Commit: e18038ac2f130979fd96f557e1bdbb6d99abf42a Parents: b12aba7 Author: Mark Payne <[email protected]> Authored: Thu Dec 17 17:31:59 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Thu Dec 17 17:31:59 2015 -0500 ---------------------------------------------------------------------- .../nifi/controller/queue/FlowFileQueue.java | 7 ++ .../cluster/manager/impl/WebClusterManager.java | 12 +- .../controller/queue/ListFlowFileRequest.java | 2 +- .../nifi/controller/StandardFlowFileQueue.java | 38 +++++- .../controller/TestStandardFlowFileQueue.java | 118 +++++++++++++++++++ 5 files changed, 165 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index dbf2f04..0d0f03f 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -298,4 +298,11 @@ public interface FlowFileQueue { * @throws IOException if unable to read FlowFiles that are stored on some external device */ FlowFileRecord getFlowFile(String flowFileUuid) throws IOException; + + /** + * Ensures that a listing can be performed on the queue + * + * @throws IllegalStateException if the queue is not in a state in which a listing can be performed + */ + void verifyCanList() throws IllegalStateException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 210cf52..6cd95b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -2854,14 +2854,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator); ListFlowFileState state = null; - int sumOfPercents = 0; + int numStepsCompleted = 0; + int numStepsTotal = 0; boolean finished = true; for (final Map.Entry<NodeIdentifier, ListingRequestDTO> entry : listingRequestMap.entrySet()) { final ListingRequestDTO nodeRequest = entry.getValue(); - Integer percentComplete = nodeRequest.getPercentCompleted(); - if (percentComplete != null) { - sumOfPercents += percentComplete; - } + + numStepsCompleted += nodeRequest.getCompletedStepCount(); + numStepsTotal += nodeRequest.getTotalStepCount(); if (!nodeRequest.getFinished()) { finished = false; @@ -2895,7 +2895,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final List<FlowFileSummaryDTO> summaryDTOs = new ArrayList<>(flowFileSummaries); listingRequest.setFlowFileSummaries(summaryDTOs); - final int percentCompleted = sumOfPercents / listingRequestMap.size(); + final int percentCompleted = numStepsCompleted / numStepsTotal; listingRequest.setPercentCompleted(percentCompleted); listingRequest.setFinished(finished); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java index aad4c4f..313ad0c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java @@ -30,7 +30,7 @@ public class ListFlowFileRequest implements ListFlowFileStatus { private final long submissionTime = System.currentTimeMillis(); private final List<FlowFileSummary> flowFileSummaries = new ArrayList<>(); - private ListFlowFileState state; + private ListFlowFileState state = ListFlowFileState.WAITING_FOR_LOCK; private String failureReason; private int numSteps; private int completedStepCount; http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index daaa763..24fd71e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -915,6 +915,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { // order. Since we need the 'position' of the element in the queue, we need to iterate over them in the proper order. writeLock.lock(); try { + logger.debug("{} Acquired lock to perform listing of FlowFiles", StandardFlowFileQueue.this); + listRequest.setState(ListFlowFileState.CALCULATING_LIST); final List<FlowFileRecord> flowFileRecords = new ArrayList<>(activeQueue.size()); FlowFileRecord flowFile; @@ -926,6 +928,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) { summaries.add(summarize(flowFile, position)); if (++resultCount >= maxResults) { + logger.debug("{} Reached max number of results of {} from active queue; listing complete", StandardFlowFileQueue.this, maxResults); break; } } @@ -937,16 +940,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue { writeLock.unlock("List FlowFiles"); } + logger.debug("{} Finished listing FlowFiles for active queue with a total of {} results", StandardFlowFileQueue.this, resultCount); + listRequest.setCompletedStepCount(++completedStepCount); - position = activeQueue.size(); - sourceLoop: while (resultCount < maxResults) { - try { + if (summaries.size() < maxResults) { + position = activeQueue.size(); + sourceLoop: try { // We are now iterating over swap files, and we don't need the write lock for this, just the read lock, since // we are not modifying anything. readLock.lock(); try { for (final String location : swapLocations) { + logger.debug("{} Performing listing of FlowFiles for Swap Location {}", StandardFlowFileQueue.this, location); final List<FlowFileRecord> flowFiles = swapManager.peek(location, StandardFlowFileQueue.this); for (final FlowFileRecord flowFile : flowFiles) { position++; @@ -954,6 +960,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) { summaries.add(summarize(flowFile, position)); if (++resultCount >= maxResults) { + logger.debug("{} Reached max number of results of {}; listing complete", StandardFlowFileQueue.this, maxResults); break sourceLoop; } } @@ -962,12 +969,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue { listRequest.setCompletedStepCount(++completedStepCount); } + logger.debug("{} Performing listing of FlowFiles from Swap Queue", StandardFlowFileQueue.this); for (final FlowFileRecord flowFile : swapQueue) { position++; if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) { summaries.add(summarize(flowFile, position)); if (++resultCount >= maxResults) { + logger.debug("{} Reached max number of results of {}; listing complete", StandardFlowFileQueue.this, maxResults); break sourceLoop; } } @@ -977,12 +986,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } finally { readLock.unlock("List FlowFiles"); } + + break sourceLoop; } catch (final IOException ioe) { logger.error("Failed to read swapped FlowFiles in order to perform listing of queue " + StandardFlowFileQueue.this, ioe); listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details."); } } + // We have now completed the listing successfully. Set the number of completed steps to the total number of steps. We may have + // skipped some steps because we have reached the maximum number of results, so we consider those steps completed. + logger.debug("{} Completed listing of FlowFiles", StandardFlowFileQueue.this); + listRequest.setCompletedStepCount(listRequest.getTotalStepCount()); + listRequest.setState(ListFlowFileState.COMPLETE); listRequest.setFlowFileSummaries(summaries); } }, "List FlowFiles for Connection " + getIdentifier()); @@ -1002,7 +1018,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key()); final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); final long size = flowFile.getSize(); - final long lastQueuedTime = flowFile.getLastQueueDate(); + final Long lastQueuedTime = flowFile.getLastQueueDate(); final long lineageStart = flowFile.getLineageStartDate(); final boolean penalized = flowFile.isPenalized(); @@ -1029,7 +1045,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public long getLastQueuedTime() { - return lastQueuedTime; + return lastQueuedTime == null ? 0L : lastQueuedTime; } @Override @@ -1100,6 +1116,18 @@ public final class StandardFlowFileQueue implements FlowFileQueue { return null; } + + @Override + public void verifyCanList() throws IllegalStateException { + if (connection.getSource().isRunning()) { + throw new IllegalStateException("Cannot list the FlowFiles of queue because the connection's source is still running"); + } + + if (connection.getDestination().isRunning()) { + throw new IllegalStateException("Cannot list the FlowFiles of queue because the connection's destination is still running"); + } + } + @Override public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) { logger.info("Initiating drop of FlowFiles from {} on behalf of {} (request identifier={})", this, requestor, requestIdentifier); http://git-wip-us.apache.org/repos/asf/nifi/blob/e18038ac/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 09ac7f2..f58d4b0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -39,6 +39,8 @@ import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.queue.DropFlowFileState; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.ListFlowFileState; +import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRepository; @@ -55,17 +57,27 @@ import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.sun.istack.logging.Logger; + +import ch.qos.logback.classic.BasicConfigurator; + public class TestStandardFlowFileQueue { private TestSwapManager swapManager = null; private StandardFlowFileQueue queue = null; private List<ProvenanceEventRecord> provRecords = new ArrayList<>(); + @BeforeClass + public static void setupLogging() { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG"); + } + @Before @SuppressWarnings("unchecked") public void setup() { @@ -388,6 +400,112 @@ public class TestStandardFlowFileQueue { assertEquals(20, swapManager.swapInCalledCount); } + + @Test(timeout = 5000) + public void testListFlowFilesOnlyActiveQueue() throws InterruptedException { + for (int i = 0; i < 9999; i++) { + queue.put(new TestFlowFile()); + } + + final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 10000); + assertNotNull(status); + assertEquals(9999, status.getQueueSize().getObjectCount()); + + while (status.getState() != ListFlowFileState.COMPLETE) { + Thread.sleep(100); + } + + assertEquals(9999, status.getFlowFileSummaries().size()); + assertEquals(100, status.getCompletionPercentage()); + assertNull(status.getFailureReason()); + assertEquals(2, status.getTotalStepCount()); + assertEquals(2, status.getCompletedStepCount()); + } + + @Test(timeout = 5000) + public void testListFlowFilesActiveQueueAndSwapQueue() throws InterruptedException { + for (int i = 0; i < 11000; i++) { + queue.put(new TestFlowFile()); + } + + final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 11000); + assertNotNull(status); + assertEquals(11000, status.getQueueSize().getObjectCount()); + + while (status.getState() != ListFlowFileState.COMPLETE) { + Thread.sleep(100); + } + + assertEquals(11000, status.getFlowFileSummaries().size()); + assertEquals(100, status.getCompletionPercentage()); + assertNull(status.getFailureReason()); + assertEquals(2, status.getTotalStepCount()); + assertEquals(2, status.getCompletedStepCount()); + } + + @Test(timeout = 5000) + public void testListFlowFilesActiveQueueAndSwapFile() throws InterruptedException { + for (int i = 0; i < 20000; i++) { + queue.put(new TestFlowFile()); + } + + final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 20000); + assertNotNull(status); + assertEquals(20000, status.getQueueSize().getObjectCount()); + + while (status.getState() != ListFlowFileState.COMPLETE) { + Thread.sleep(100); + } + + assertEquals(20000, status.getFlowFileSummaries().size()); + assertEquals(100, status.getCompletionPercentage()); + assertNull(status.getFailureReason()); + assertEquals(3, status.getTotalStepCount()); + assertEquals(3, status.getCompletedStepCount()); + } + + @Test(timeout = 5000) + public void testListFlowFilesActiveQueueAndSwapFilesAndSwapQueue() throws InterruptedException { + for (int i = 0; i < 30050; i++) { + queue.put(new TestFlowFile()); + } + + final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 30050); + assertNotNull(status); + assertEquals(30050, status.getQueueSize().getObjectCount()); + + while (status.getState() != ListFlowFileState.COMPLETE) { + Thread.sleep(100); + } + + assertEquals(30050, status.getFlowFileSummaries().size()); + assertEquals(100, status.getCompletionPercentage()); + assertNull(status.getFailureReason()); + assertEquals(4, status.getTotalStepCount()); + assertEquals(4, status.getCompletedStepCount()); + } + + @Test(timeout = 5000000) + public void testListFlowFilesResultsLimited() throws InterruptedException { + for (int i = 0; i < 30050; i++) { + queue.put(new TestFlowFile()); + } + + final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100); + assertNotNull(status); + assertEquals(30050, status.getQueueSize().getObjectCount()); + + while (status.getState() != ListFlowFileState.COMPLETE) { + Thread.sleep(100); + } + + assertEquals(100, status.getFlowFileSummaries().size()); + assertEquals(100, status.getCompletionPercentage()); + assertNull(status.getFailureReason()); + assertEquals(4, status.getTotalStepCount()); + assertEquals(4, status.getCompletedStepCount()); + } + private class TestSwapManager implements FlowFileSwapManager { private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>(); int swapOutCalledCount = 0;
