NIFI-730: Added error messages if we fail to drop FlowFiles from queue
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/afb76afc Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/afb76afc Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/afb76afc Branch: refs/heads/master Commit: afb76afcd0fd7d0c144a37621fdabc181bd42307 Parents: 72ff2a2 Author: Mark Payne <[email protected]> Authored: Tue Oct 13 15:57:18 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Tue Oct 13 15:57:18 2015 -0400 ---------------------------------------------------------------------- .../controller/queue/DropFlowFileStatus.java | 5 +++ .../nifi/controller/DropFlowFileRequest.java | 11 +++++ .../nifi/controller/StandardFlowFileQueue.java | 45 ++++++++++++++++---- 3 files changed, 53 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java index 7d5b9c2..737fbe3 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java @@ -70,4 +70,9 @@ public interface DropFlowFileStatus { * @return the current state of the operation */ DropFlowFileState getState(); + + /** + * @return the reason that the state is set to a Failure state, or <code>null</code> if the state is not {@link DropFlowFileState#FAILURE}. + */ + String getFailureReason(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java index 4104308..189fe7d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java @@ -30,6 +30,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus { private volatile QueueSize droppedSize = new QueueSize(0, 0L); private volatile long lastUpdated = System.currentTimeMillis(); private volatile Thread executionThread; + private volatile String failureReason; private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK; @@ -85,8 +86,18 @@ public class DropFlowFileRequest implements DropFlowFileStatus { return lastUpdated; } + @Override + public String getFailureReason() { + return failureReason; + } + synchronized void setState(final DropFlowFileState state) { + setState(state, null); + } + + synchronized void setState(final DropFlowFileState state, final String explanation) { this.state = state; + this.failureReason = explanation; this.lastUpdated = System.currentTimeMillis(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 82c1c7e..5b137f7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -928,14 +928,34 @@ public final class StandardFlowFileQueue implements FlowFileQueue { try { final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue); - QueueSize droppedSize = drop(activeQueueRecords, requestor); + + QueueSize droppedSize; + try { + droppedSize = drop(activeQueueRecords, requestor); + } catch (final IOException ioe) { + logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); + logger.error("", ioe); + + dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString()); + return; + } + activeQueue.clear(); activeQueueContentSize = 0; activeQueueSizeRef.set(0); dropRequest.setCurrentSize(getQueueSize()); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); - droppedSize = drop(swapQueue, requestor); + try { + droppedSize = drop(swapQueue, requestor); + } catch (final IOException ioe) { + logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); + logger.error("", ioe); + + dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString()); + return; + } + swapQueue.clear(); dropRequest.setCurrentSize(getQueueSize()); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); @@ -946,12 +966,22 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final Iterator<String> swapLocationItr = swapLocations.iterator(); while (swapLocationItr.hasNext()) { final String swapLocation = swapLocationItr.next(); - final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this); + + List<FlowFileRecord> swappedIn = null; try { + swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this); droppedSize = drop(swappedIn, requestor); - } catch (final Exception e) { - activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue. - throw e; + } catch (final IOException ioe) { + logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}", + swapLocation, StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); + logger.error("", ioe); + + dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString()); + if (swappedIn != null) { + activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue. + } + + return; } dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); @@ -963,8 +993,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { dropRequest.setState(DropFlowFileState.COMPLETE); } catch (final Exception e) { - // TODO: Handle adequately - dropRequest.setState(DropFlowFileState.FAILURE); + dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString()); } } finally { writeLock.unlock("Drop FlowFiles");
