Repository: nifi Updated Branches: refs/heads/master 99c7fe3b4 -> 62333c9e0
NIFI-1572: Ensure that if an Exception is thrown when processing a bin, all sessions involved are rolled back or otherwise accounted for Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1149bc61 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1149bc61 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1149bc61 Branch: refs/heads/master Commit: 1149bc61cb3390210be4566dafba80564c4ab8d9 Parents: 99c7fe3 Author: Mark Payne <[email protected]> Authored: Mon Feb 29 10:43:27 2016 -0500 Committer: Mark Payne <[email protected]> Committed: Mon Feb 29 15:42:33 2016 -0500 ---------------------------------------------------------------------- .../java/org/apache/nifi/processor/util/bin/BinFiles.java | 8 ++++++++ 1 file changed, 8 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/1149bc61/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java index 62e5655..7c90342 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -230,6 +230,14 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { } session.rollback(); return 1; + } catch (final Exception e) { + logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {binCopy.size(), e}); + + for (final FlowFileSessionWrapper wrapper : binCopy) { + wrapper.getSession().rollback(); + } + session.rollback(); + return 1; } // we first commit the bundle's session before the originals' sessions because if we are restarted or crash
