NIFI-495: Fixed handling of FlowFiles if destination full by rolling back session
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7819afbe Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7819afbe Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7819afbe Branch: refs/heads/develop Commit: 7819afbefd980ce68f43093302997024926d9f51 Parents: 8d20b82 Author: Mark Payne <marka...@hotmail.com> Authored: Wed Apr 8 13:38:33 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Wed Apr 8 13:38:33 2015 -0400 ---------------------------------------------------------------------- .../java/org/apache/nifi/remote/StandardRemoteGroupPort.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7819afbe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 740e405..69ba0fd 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -171,6 +171,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { this.targetRunning.set(false); final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", this, url); logger.error(message); + session.rollback(); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); return; } catch (final UnknownPortException e) { @@ -178,6 +179,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { this.targetExists.set(false); final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", this, url); logger.error(message); + session.rollback(); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); return; } catch (final IOException e) { @@ -186,13 +188,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { if ( logger.isDebugEnabled() ) { logger.error("", e); } + session.rollback(); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); - session.rollback(); return; } if ( transaction == null ) { logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this); + session.rollback(); context.yield(); return; }