Repository: nifi
Updated Branches:
  refs/heads/master 85cb5dd1e -> c68da68dc


NIFI-893: Ensure that if session.commit fails, previously 'checkpointed' 
sessions are rolled back


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fbec28ba
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fbec28ba
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fbec28ba

Branch: refs/heads/master
Commit: fbec28bad93a1263b2fe4da74ec597c0d35996fe
Parents: 4baffac
Author: Mark Payne <[email protected]>
Authored: Thu Aug 27 15:17:55 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Thu Aug 27 15:17:55 2015 -0400

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      |  34 +++++-
 .../repository/TestStandardProcessSession.java  | 109 ++++++++++++++++++-
 2 files changed, 136 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fbec28ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 0e08325..3ba7e4e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -337,9 +337,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 final Collection<StandardRepositoryRecord> repoRecords = 
checkpoint.records.values();
                 context.getFlowFileRepository().updateRepository((Collection) 
repoRecords);
             } catch (final IOException ioe) {
-                rollback();
+                // if we fail to commit the session, we need to roll back
+                // the checkpoints as well because none of the checkpoints
+                // were ever committed.
+                rollback(false, true);
                 throw new ProcessException("FlowFile Repository failed to 
update", ioe);
             }
+
             final long flowFileRepoUpdateFinishNanos = System.nanoTime();
             final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos 
- claimRemovalFinishNanos;
 
@@ -422,7 +426,10 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             }
         } catch (final Exception e) {
             try {
-                rollback();
+                // if we fail to commit the session, we need to roll back
+                // the checkpoints as well because none of the checkpoints
+                // were ever committed.
+                rollback(false, true);
             } catch (final Exception e1) {
                 e.addSuppressed(e1);
             }
@@ -849,8 +856,23 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public void rollback(final boolean penalize) {
+        rollback(penalize, false);
+    }
+
+    private void rollback(final boolean penalize, final boolean 
rollbackCheckpoint) {
         deleteOnCommit.clear();
-        if (records.isEmpty()) {
+
+        final Set<StandardRepositoryRecord> recordsToHandle = new HashSet<>();
+        recordsToHandle.addAll(records.values());
+        if (rollbackCheckpoint) {
+            final Checkpoint existingCheckpoint = this.checkpoint;
+            this.checkpoint = null;
+            if (existingCheckpoint != null && existingCheckpoint.records != 
null) {
+                recordsToHandle.addAll(existingCheckpoint.records.values());
+            }
+        }
+
+        if (recordsToHandle.isEmpty()) {
             LOG.trace("{} was rolled back, but no events were performed by 
this ProcessSession", this);
             acknowledgeRecords();
             return;
@@ -859,14 +881,14 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         resetWriteClaims();
         resetReadClaim();
 
-        for (final StandardRepositoryRecord record : records.values()) {
-            // remove the working claim if it's different than the original.
+        for (final StandardRepositoryRecord record : recordsToHandle) {
+            // remove the working claims if they are different than the 
originals.
             removeTemporaryClaim(record);
         }
 
         final Set<RepositoryRecord> abortedRecords = new HashSet<>();
         final Set<StandardRepositoryRecord> transferRecords = new HashSet<>();
-        for (final StandardRepositoryRecord record : records.values()) {
+        for (final StandardRepositoryRecord record : recordsToHandle) {
             if (record.isMarkedForAbort()) {
                 removeContent(record.getWorkingClaim());
                 if (record.getCurrentClaim() != null && 
!record.getCurrentClaim().equals(record.getWorkingClaim())) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/fbec28ba/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index ba34148..68aa389 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -829,6 +829,106 @@ public class TestStandardProcessSession {
         }
     }
 
+
+    @Test
+    public void testCommitFailureRequeuesFlowFiles() {
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+
+        .contentClaimOffset(0L).size(0L).build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile originalFlowFile = session.get();
+        assertTrue(flowFileQueue.isActiveQueueEmpty());
+        assertEquals(1, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+
+        final FlowFile modified = session.write(originalFlowFile, new 
OutputStreamCallback() {
+            @Override
+            public void process(OutputStream out) throws IOException {
+                out.write("Hello".getBytes());
+            }
+        });
+
+        session.transfer(modified);
+
+        // instruct flowfile repo to throw IOException on update
+        flowFileRepo.setFailOnUpdate(true);
+
+        try {
+            session.commit();
+            Assert.fail("Session commit completed, even though FlowFile Repo 
threw IOException");
+        } catch (final ProcessException pe) {
+            // expected behavior because FlowFile Repo will throw IOException
+        }
+
+        assertFalse(flowFileQueue.isActiveQueueEmpty());
+        assertEquals(1, flowFileQueue.size().getObjectCount());
+        assertEquals(0, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void testRollbackAfterCheckpoint() {
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+
+        .contentClaimOffset(0L).size(0L).build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile originalFlowFile = session.get();
+        assertTrue(flowFileQueue.isActiveQueueEmpty());
+        assertEquals(1, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+
+        final FlowFile modified = session.write(originalFlowFile, new 
OutputStreamCallback() {
+            @Override
+            public void process(OutputStream out) throws IOException {
+                out.write("Hello".getBytes());
+            }
+        });
+
+        session.transfer(modified);
+
+        session.checkpoint();
+        assertTrue(flowFileQueue.isActiveQueueEmpty());
+
+        session.rollback();
+        assertTrue(flowFileQueue.isActiveQueueEmpty());
+        assertEquals(0, flowFileQueue.size().getObjectCount());
+        assertEquals(0, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+
+        session.rollback();
+
+        flowFileQueue.put(flowFileRecord);
+        assertFalse(flowFileQueue.isActiveQueueEmpty());
+
+        final FlowFile originalRound2 = session.get();
+        assertTrue(flowFileQueue.isActiveQueueEmpty());
+        assertEquals(1, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+
+        final FlowFile modifiedRound2 = session.write(originalRound2, new 
OutputStreamCallback() {
+            @Override
+            public void process(OutputStream out) throws IOException {
+                out.write("Hello".getBytes());
+            }
+        });
+
+        session.transfer(modifiedRound2);
+
+        session.checkpoint();
+        assertTrue(flowFileQueue.isActiveQueueEmpty());
+        assertEquals(1, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+
+        session.commit();
+
+        // FlowFile transferred back to queue
+        assertEquals(1, flowFileQueue.size().getObjectCount());
+        assertEquals(0, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+        assertFalse(flowFileQueue.isActiveQueueEmpty());
+    }
+
     @Test
     public void testCreateEmitted() throws IOException {
         final FlowFile newFlowFile = session.create();
@@ -910,9 +1010,13 @@ public class TestStandardProcessSession {
     }
 
     private static class MockFlowFileRepository implements FlowFileRepository {
-
+        private boolean failOnUpdate = false;
         private final AtomicLong idGenerator = new AtomicLong(0L);
 
+        public void setFailOnUpdate(final boolean fail) {
+            this.failOnUpdate = fail;
+        }
+
         @Override
         public void close() throws IOException {
         }
@@ -929,6 +1033,9 @@ public class TestStandardProcessSession {
 
         @Override
         public void updateRepository(Collection<RepositoryRecord> records) 
throws IOException {
+            if (failOnUpdate) {
+                throw new IOException("FlowFile Repository told to fail on 
update for unit test");
+            }
         }
 
         @Override

Reply via email to