NIFI-730: Return DropFlowFileStatus object when calling cancel
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0af1acaa Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0af1acaa Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0af1acaa Branch: refs/heads/master Commit: 0af1acaafaf28844b31c41e276c4dbce18390acb Parents: 77f7d75 Author: Mark Payne <[email protected]> Authored: Wed Oct 14 09:46:21 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Wed Oct 14 09:46:21 2015 -0400 ---------------------------------------------------------------------- .../nifi/controller/queue/FlowFileQueue.java | 10 ++++++---- .../nifi/controller/StandardFlowFileQueue.java | 19 ++++++++++++++----- .../controller/TestStandardFlowFileQueue.java | 7 ++----- 3 files changed, 22 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0af1acaa/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index 2d67d58..dcf7f13 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -199,11 +199,13 @@ public interface FlowFileQueue { DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier); /** - * Cancels the request to drop FlowFiles that has the given identifier + * Cancels the request to drop FlowFiles that has the given identifier. After this method is called, the request + * will no longer be known by this queue, so subsequent calls to {@link #getDropFlowFileStatus(String)} or + * {@link #cancelDropFlowFileRequest(String)} will return <code>null</code> * * @param requestIdentifier the identifier of the Drop FlowFile Request - * @return <code>true</code> if the request was canceled, <code>false</code> if the request has - * already completed or is not known + * @return the status for the request with the given identifier after it has been canceled, or <code>null</code> if no + * request status exists with that identifier */ - boolean cancelDropFlowFileRequest(String requestIdentifier); + DropFlowFileStatus cancelDropFlowFileRequest(String requestIdentifier); } http://git-wip-us.apache.org/repos/asf/nifi/blob/0af1acaa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index acf2830..8085760 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -1078,7 +1078,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue { builder.setLineageStartDate(flowFile.getLineageStartDate()); builder.setComponentId(getIdentifier()); builder.setComponentType("Connection"); - builder.setDetails("FlowFile manually dropped; request made by " + requestor); + builder.setAttributes(flowFile.getAttributes(), Collections.<String, String> emptyMap()); + builder.setDetails("Manually dropped by " + requestor); + builder.setSourceQueueIdentifier(getIdentifier()); + + final ContentClaim contentClaim = flowFile.getContentClaim(); + if (contentClaim != null) { + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize()); + } + return builder.build(); } @@ -1138,14 +1147,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override - public boolean cancelDropFlowFileRequest(final String requestIdentifier) { + public DropFlowFileRequest cancelDropFlowFileRequest(final String requestIdentifier) { final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier); if (request == null) { - return false; + return null; } - final boolean successful = request.cancel(); - return successful; + request.cancel(); + return request; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/0af1acaa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 3789ea5..4b67d91 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -156,7 +156,7 @@ public class TestStandardFlowFileQueue { queue.poll(exp); } - @Test + @Test(timeout = 20000) public void testDropSwappedFlowFiles() { for (int i = 1; i <= 210000; i++) { queue.put(new TestFlowFile()); @@ -165,15 +165,12 @@ public class TestStandardFlowFileQueue { assertEquals(20, swapManager.swappedOut.size()); final DropFlowFileStatus status = queue.dropFlowFiles("1", "Unit Test"); while (status.getState() != DropFlowFileState.COMPLETE) { - final QueueSize queueSize = queue.size(); - System.out.println(queueSize); try { - Thread.sleep(1000L); + Thread.sleep(100L); } catch (final Exception e) { } } - System.out.println(queue.size()); assertEquals(0, queue.size().getObjectCount()); assertEquals(0, queue.size().getByteCount()); assertEquals(0, swapManager.swappedOut.size());
