NIFI-730: Added error messages if we fail to drop FlowFiles from queue

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/afb76afc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/afb76afc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/afb76afc

Branch: refs/heads/master
Commit: afb76afcd0fd7d0c144a37621fdabc181bd42307
Parents: 72ff2a2
Author: Mark Payne <[email protected]>
Authored: Tue Oct 13 15:57:18 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Tue Oct 13 15:57:18 2015 -0400

----------------------------------------------------------------------
 .../controller/queue/DropFlowFileStatus.java    |  5 +++
 .../nifi/controller/DropFlowFileRequest.java    | 11 +++++
 .../nifi/controller/StandardFlowFileQueue.java  | 45 ++++++++++++++++----
 3 files changed, 53 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
index 7d5b9c2..737fbe3 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -70,4 +70,9 @@ public interface DropFlowFileStatus {
      * @return the current state of the operation
      */
     DropFlowFileState getState();
+
+    /**
+     * @return the reason that the state is set to a Failure state, or 
<code>null</code> if the state is not {@link DropFlowFileState#FAILURE}.
+     */
+    String getFailureReason();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
index 4104308..189fe7d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
@@ -30,6 +30,7 @@ public class DropFlowFileRequest implements 
DropFlowFileStatus {
     private volatile QueueSize droppedSize = new QueueSize(0, 0L);
     private volatile long lastUpdated = System.currentTimeMillis();
     private volatile Thread executionThread;
+    private volatile String failureReason;
 
     private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
 
@@ -85,8 +86,18 @@ public class DropFlowFileRequest implements 
DropFlowFileStatus {
         return lastUpdated;
     }
 
+    @Override
+    public String getFailureReason() {
+        return failureReason;
+    }
+
     synchronized void setState(final DropFlowFileState state) {
+        setState(state, null);
+    }
+
+    synchronized void setState(final DropFlowFileState state, final String 
explanation) {
         this.state = state;
+        this.failureReason = explanation;
         this.lastUpdated = System.currentTimeMillis();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 82c1c7e..5b137f7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -928,14 +928,34 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
 
                     try {
                         final List<FlowFileRecord> activeQueueRecords = new 
ArrayList<>(activeQueue);
-                        QueueSize droppedSize = drop(activeQueueRecords, 
requestor);
+
+                        QueueSize droppedSize;
+                        try {
+                            droppedSize = drop(activeQueueRecords, requestor);
+                        } catch (final IOException ioe) {
+                            logger.error("Failed to drop the FlowFiles from 
queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), 
ioe.toString());
+                            logger.error("", ioe);
+
+                            dropRequest.setState(DropFlowFileState.FAILURE, 
"Failed to drop FlowFiles due to " + ioe.toString());
+                            return;
+                        }
+
                         activeQueue.clear();
                         activeQueueContentSize = 0;
                         activeQueueSizeRef.set(0);
                         dropRequest.setCurrentSize(getQueueSize());
                         
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
 
-                        droppedSize = drop(swapQueue, requestor);
+                        try {
+                            droppedSize = drop(swapQueue, requestor);
+                        } catch (final IOException ioe) {
+                            logger.error("Failed to drop the FlowFiles from 
queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), 
ioe.toString());
+                            logger.error("", ioe);
+
+                            dropRequest.setState(DropFlowFileState.FAILURE, 
"Failed to drop FlowFiles due to " + ioe.toString());
+                            return;
+                        }
+
                         swapQueue.clear();
                         dropRequest.setCurrentSize(getQueueSize());
                         
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
@@ -946,12 +966,22 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                         final Iterator<String> swapLocationItr = 
swapLocations.iterator();
                         while (swapLocationItr.hasNext()) {
                             final String swapLocation = swapLocationItr.next();
-                            final List<FlowFileRecord> swappedIn = 
swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
+
+                            List<FlowFileRecord> swappedIn = null;
                             try {
+                                swappedIn = swapManager.swapIn(swapLocation, 
StandardFlowFileQueue.this);
                                 droppedSize = drop(swappedIn, requestor);
-                            } catch (final Exception e) {
-                                activeQueue.addAll(swappedIn); // ensure that 
we don't lose the FlowFiles from our queue.
-                                throw e;
+                            } catch (final IOException ioe) {
+                                logger.error("Failed to swap in FlowFiles from 
Swap File {} in order to drop the FlowFiles for Connection {} due to {}",
+                                    swapLocation, 
StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
+                                logger.error("", ioe);
+
+                                
dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles 
from Swap File " + swapLocation + " due to " + ioe.toString());
+                                if (swappedIn != null) {
+                                    activeQueue.addAll(swappedIn); // ensure 
that we don't lose the FlowFiles from our queue.
+                                }
+
+                                return;
                             }
 
                             
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
@@ -963,8 +993,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
 
                         dropRequest.setState(DropFlowFileState.COMPLETE);
                     } catch (final Exception e) {
-                        // TODO: Handle adequately
-                        dropRequest.setState(DropFlowFileState.FAILURE);
+                        dropRequest.setState(DropFlowFileState.FAILURE, 
"Failed to drop FlowFiles due to " + e.toString());
                     }
                 } finally {
                     writeLock.unlock("Drop FlowFiles");

Reply via email to