NIFI-495: Fixed handling of FlowFiles if destination full by rolling back 
session


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7819afbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7819afbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7819afbe

Branch: refs/heads/develop
Commit: 7819afbefd980ce68f43093302997024926d9f51
Parents: 8d20b82
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Apr 8 13:38:33 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed Apr 8 13:38:33 2015 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/remote/StandardRemoteGroupPort.java    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7819afbe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 740e405..69ba0fd 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -171,6 +171,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             this.targetRunning.set(false);
             final String message = String.format("%s failed to communicate 
with %s because the remote instance indicates that the port is not in a valid 
state", this, url);
             logger.error(message);
+               session.rollback();
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
             return;
         } catch (final UnknownPortException e) {
@@ -178,6 +179,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             this.targetExists.set(false);
             final String message = String.format("%s failed to communicate 
with %s because the remote instance indicates that the port no longer exists", 
this, url);
             logger.error(message);
+               session.rollback();
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
             return;
         } catch (final IOException e) {
@@ -186,13 +188,14 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             if ( logger.isDebugEnabled() ) {
                 logger.error("", e);
             }
+               session.rollback();
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
-            session.rollback();
             return;
         }
         
         if ( transaction == null ) {
             logger.debug("{} Unable to create transaction to communicate with; 
all peers must be penalized, so yielding context", this);
+            session.rollback();
             context.yield();
             return;
         }

Reply via email to