Repository: nifi
Updated Branches:
  refs/heads/master 99c7fe3b4 -> 62333c9e0


NIFI-1572: Ensure that if an Exception is thrown when processing a bin, all 
sessions involved are rolled back or otherwise accounted for


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1149bc61
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1149bc61
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1149bc61

Branch: refs/heads/master
Commit: 1149bc61cb3390210be4566dafba80564c4ab8d9
Parents: 99c7fe3
Author: Mark Payne <[email protected]>
Authored: Mon Feb 29 10:43:27 2016 -0500
Committer: Mark Payne <[email protected]>
Committed: Mon Feb 29 15:42:33 2016 -0500

----------------------------------------------------------------------
 .../java/org/apache/nifi/processor/util/bin/BinFiles.java    | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1149bc61/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index 62e5655..7c90342 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -230,6 +230,14 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
             }
             session.rollback();
             return 1;
+        } catch (final Exception e) {
+            logger.error("Failed to process bundle of {} files due to {}; 
rolling back sessions", new Object[] {binCopy.size(), e});
+
+            for (final FlowFileSessionWrapper wrapper : binCopy) {
+                wrapper.getSession().rollback();
+            }
+            session.rollback();
+            return 1;
         }
 
         // we first commit the bundle's session before the originals' sessions 
because if we are restarted or crash

Reply via email to