Repository: nifi
Updated Branches:
  refs/heads/master a264c49d8 -> 8e5347156


NIFI-841: Ensure that IOExceptions on session commit are handled properly

Signed-off-by: joewitt <[email protected]>
Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/35439db3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/35439db3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/35439db3

Branch: refs/heads/master
Commit: 35439db347a05376e29c65363cbe47a122ce840c
Parents: a264c49
Author: Mark Payne <[email protected]>
Authored: Tue Aug 11 16:19:47 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Tue Aug 18 20:06:41 2015 -0400

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      | 303 ++++++++++---------
 1 file changed, 165 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/35439db3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 2e33c22..fe7f125 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -292,138 +292,148 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     private void commit(final Checkpoint checkpoint) {
-        final long commitStartNanos = System.nanoTime();
-
-        resetWriteClaims();
-        resetReadClaim();
-
-        final long updateProvenanceStart = System.nanoTime();
-        updateProvenanceRepo(checkpoint);
-
-        final long claimRemovalStart = System.nanoTime();
-        final long updateProvenanceNanos = claimRemovalStart - 
updateProvenanceStart;
-
-        // Figure out which content claims can be released.
-        // At this point, we will decrement the Claimant Count for the claims 
via the Content Repository.
-        // We do not actually destroy the content because otherwise, we could 
remove the
-        // Original Claim and crash/restart before the FlowFileRepository is 
updated. This will result in the FlowFile being restored such that
-        // the content claim points to the Original Claim -- which has already 
been removed!
-        for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : 
checkpoint.records.entrySet()) {
-            final FlowFile flowFile = entry.getKey();
-            final StandardRepositoryRecord record = entry.getValue();
-
-            if (record.isMarkedForDelete()) {
-                // if the working claim is not the same as the original claim, 
we can immediately destroy the working claim
-                // because it was created in this session and is to be 
deleted. We don't need to wait for the FlowFile Repo to sync.
-                removeContent(record.getWorkingClaim());
-
-                if (record.getOriginalClaim() != null && 
!record.getOriginalClaim().equals(record.getWorkingClaim())) {
-                    // if working & original claim are same, don't remove 
twice; we only want to remove the original
-                    // if it's different from the working. Otherwise, we 
remove two claimant counts. This causes
-                    // an issue if we only updated the FlowFile attributes.
-                    removeContent(record.getOriginalClaim());
-                }
-                final long flowFileLife = System.currentTimeMillis() - 
flowFile.getEntryDate();
-                final Connectable connectable = context.getConnectable();
-                final Object terminator = connectable instanceof ProcessorNode 
? ((ProcessorNode) connectable).getProcessor() : connectable;
-                LOG.info("{} terminated by {}; life of FlowFile = {} ms", new 
Object[]{flowFile, terminator, flowFileLife});
-            } else if (record.isWorking() && record.getWorkingClaim() != 
record.getOriginalClaim()) {
-                //records which have been updated - remove original if exists
-                removeContent(record.getOriginalClaim());
-            }
-        }
-
-        final long claimRemovalFinishNanos = System.nanoTime();
-        final long claimRemovalNanos = claimRemovalFinishNanos - 
claimRemovalStart;
-
-        // Update the FlowFile Repository
-        try {
-            final Collection<StandardRepositoryRecord> repoRecords = 
checkpoint.records.values();
-            context.getFlowFileRepository().updateRepository((Collection) 
repoRecords);
-        } catch (final IOException ioe) {
-            rollback();
-            throw new ProcessException("FlowFile Repository failed to update", 
ioe);
-        }
-        final long flowFileRepoUpdateFinishNanos = System.nanoTime();
-        final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - 
claimRemovalFinishNanos;
-
-        updateEventRepository(checkpoint);
-
-        final long updateEventRepositoryFinishNanos = System.nanoTime();
-        final long updateEventRepositoryNanos = 
updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
-
-        // transfer the flowfiles to the connections' queues.
-        final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new 
HashMap<>();
-        for (final StandardRepositoryRecord record : 
checkpoint.records.values()) {
-            if (record.isMarkedForAbort() || record.isMarkedForDelete()) {
-                continue; //these don't need to be transferred
-            }
-            // record.getCurrent() will return null if this record was created 
in this session --
-            // in this case, we just ignore it, and it will be cleaned up by 
clearing the records map.
-            if (record.getCurrent() != null) {
-                Collection<FlowFileRecord> collection = 
recordMap.get(record.getDestination());
-                if (collection == null) {
-                    collection = new ArrayList<>();
-                    recordMap.put(record.getDestination(), collection);
-                }
-                collection.add(record.getCurrent());
-            }
-        }
-
-        for (final Map.Entry<FlowFileQueue, Collection<FlowFileRecord>> entry 
: recordMap.entrySet()) {
-            entry.getKey().putAll(entry.getValue());
-        }
-
-        final long enqueueFlowFileFinishNanos = System.nanoTime();
-        final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - 
updateEventRepositoryFinishNanos;
-
-        // Delete any files from disk that need to be removed.
-        for (final Path path : checkpoint.deleteOnCommit) {
-            try {
-                Files.deleteIfExists(path);
-            } catch (final IOException e) {
-                throw new FlowFileAccessException("Unable to delete " + 
path.toFile().getAbsolutePath(), e);
-            }
-        }
-        checkpoint.deleteOnCommit.clear();
-
-        if (LOG.isInfoEnabled()) {
-            final String sessionSummary = summarizeEvents(checkpoint);
-            if (!sessionSummary.isEmpty()) {
-                LOG.info("{} for {}, committed the following events: {}", new 
Object[]{this, connectableDescription, sessionSummary});
-            }
-        }
-
-        for (final Map.Entry<String, Long> entry : 
checkpoint.localCounters.entrySet()) {
-            adjustCounter(entry.getKey(), entry.getValue(), true);
-        }
-
-        for (final Map.Entry<String, Long> entry : 
checkpoint.globalCounters.entrySet()) {
-            adjustCounter(entry.getKey(), entry.getValue(), true);
-        }
-
-        acknowledgeRecords();
-        resetState();
-
-        if (LOG.isDebugEnabled()) {
-            final StringBuilder timingInfo = new StringBuilder();
-            timingInfo.append("Session commit for ").append(this).append(" 
[").append(connectableDescription).append("]").append(" took ");
-
-            final long commitNanos = System.nanoTime() - commitStartNanos;
-            formatNanos(commitNanos, timingInfo);
-            timingInfo.append("; FlowFile Repository Update took ");
-            formatNanos(flowFileRepoUpdateNanos, timingInfo);
-            timingInfo.append("; Claim Removal took ");
-            formatNanos(claimRemovalNanos, timingInfo);
-            timingInfo.append("; FlowFile Event Update took ");
-            formatNanos(updateEventRepositoryNanos, timingInfo);
-            timingInfo.append("; Enqueuing FlowFiles took ");
-            formatNanos(enqueueFlowFileNanos, timingInfo);
-            timingInfo.append("; Updating Provenance Event Repository took ");
-            formatNanos(updateProvenanceNanos, timingInfo);
-
-            LOG.debug(timingInfo.toString());
-        }
+       try {
+               final long commitStartNanos = System.nanoTime();
+       
+               resetWriteClaims(false);
+               resetReadClaim();
+       
+               final long updateProvenanceStart = System.nanoTime();
+               updateProvenanceRepo(checkpoint);
+       
+               final long claimRemovalStart = System.nanoTime();
+               final long updateProvenanceNanos = claimRemovalStart - 
updateProvenanceStart;
+       
+               // Figure out which content claims can be released.
+               // At this point, we will decrement the Claimant Count for the 
claims via the Content Repository.
+               // We do not actually destroy the content because otherwise, we 
could remove the
+               // Original Claim and crash/restart before the 
FlowFileRepository is updated. This will result in the FlowFile being restored 
such that
+               // the content claim points to the Original Claim -- which has 
already been removed!
+               for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> 
entry : checkpoint.records.entrySet()) {
+                   final FlowFile flowFile = entry.getKey();
+                   final StandardRepositoryRecord record = entry.getValue();
+       
+                   if (record.isMarkedForDelete()) {
+                       // if the working claim is not the same as the original 
claim, we can immediately destroy the working claim
+                       // because it was created in this session and is to be 
deleted. We don't need to wait for the FlowFile Repo to sync.
+                       removeContent(record.getWorkingClaim());
+       
+                       if (record.getOriginalClaim() != null && 
!record.getOriginalClaim().equals(record.getWorkingClaim())) {
+                           // if working & original claim are same, don't 
remove twice; we only want to remove the original
+                           // if it's different from the working. Otherwise, 
we remove two claimant counts. This causes
+                           // an issue if we only updated the FlowFile 
attributes.
+                           removeContent(record.getOriginalClaim());
+                       }
+                       final long flowFileLife = System.currentTimeMillis() - 
flowFile.getEntryDate();
+                       final Connectable connectable = 
context.getConnectable();
+                       final Object terminator = connectable instanceof 
ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
+                       LOG.info("{} terminated by {}; life of FlowFile = {} 
ms", new Object[]{flowFile, terminator, flowFileLife});
+                   } else if (record.isWorking() && record.getWorkingClaim() 
!= record.getOriginalClaim()) {
+                       //records which have been updated - remove original if 
exists
+                       removeContent(record.getOriginalClaim());
+                   }
+               }
+       
+               final long claimRemovalFinishNanos = System.nanoTime();
+               final long claimRemovalNanos = claimRemovalFinishNanos - 
claimRemovalStart;
+       
+               // Update the FlowFile Repository
+               try {
+                   final Collection<StandardRepositoryRecord> repoRecords = 
checkpoint.records.values();
+                   
context.getFlowFileRepository().updateRepository((Collection) repoRecords);
+               } catch (final IOException ioe) {
+                   rollback();
+                   throw new ProcessException("FlowFile Repository failed to 
update", ioe);
+               }
+               final long flowFileRepoUpdateFinishNanos = System.nanoTime();
+               final long flowFileRepoUpdateNanos = 
flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos;
+       
+               updateEventRepository(checkpoint);
+       
+               final long updateEventRepositoryFinishNanos = System.nanoTime();
+               final long updateEventRepositoryNanos = 
updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
+       
+               // transfer the flowfiles to the connections' queues.
+               final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap 
= new HashMap<>();
+               for (final StandardRepositoryRecord record : 
checkpoint.records.values()) {
+                   if (record.isMarkedForAbort() || 
record.isMarkedForDelete()) {
+                       continue; //these don't need to be transferred
+                   }
+                   // record.getCurrent() will return null if this record was 
created in this session --
+                   // in this case, we just ignore it, and it will be cleaned 
up by clearing the records map.
+                   if (record.getCurrent() != null) {
+                       Collection<FlowFileRecord> collection = 
recordMap.get(record.getDestination());
+                       if (collection == null) {
+                           collection = new ArrayList<>();
+                           recordMap.put(record.getDestination(), collection);
+                       }
+                       collection.add(record.getCurrent());
+                   }
+               }
+       
+               for (final Map.Entry<FlowFileQueue, Collection<FlowFileRecord>> 
entry : recordMap.entrySet()) {
+                   entry.getKey().putAll(entry.getValue());
+               }
+       
+               final long enqueueFlowFileFinishNanos = System.nanoTime();
+               final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - 
updateEventRepositoryFinishNanos;
+       
+               // Delete any files from disk that need to be removed.
+               for (final Path path : checkpoint.deleteOnCommit) {
+                   try {
+                       Files.deleteIfExists(path);
+                   } catch (final IOException e) {
+                       throw new FlowFileAccessException("Unable to delete " + 
path.toFile().getAbsolutePath(), e);
+                   }
+               }
+               checkpoint.deleteOnCommit.clear();
+       
+               if (LOG.isInfoEnabled()) {
+                   final String sessionSummary = summarizeEvents(checkpoint);
+                   if (!sessionSummary.isEmpty()) {
+                       LOG.info("{} for {}, committed the following events: 
{}", new Object[]{this, connectableDescription, sessionSummary});
+                   }
+               }
+       
+               for (final Map.Entry<String, Long> entry : 
checkpoint.localCounters.entrySet()) {
+                   adjustCounter(entry.getKey(), entry.getValue(), true);
+               }
+       
+               for (final Map.Entry<String, Long> entry : 
checkpoint.globalCounters.entrySet()) {
+                   adjustCounter(entry.getKey(), entry.getValue(), true);
+               }
+       
+               acknowledgeRecords();
+               resetState();
+       
+               if (LOG.isDebugEnabled()) {
+                   final StringBuilder timingInfo = new StringBuilder();
+                   timingInfo.append("Session commit for 
").append(this).append(" 
[").append(connectableDescription).append("]").append(" took ");
+       
+                   final long commitNanos = System.nanoTime() - 
commitStartNanos;
+                   formatNanos(commitNanos, timingInfo);
+                   timingInfo.append("; FlowFile Repository Update took ");
+                   formatNanos(flowFileRepoUpdateNanos, timingInfo);
+                   timingInfo.append("; Claim Removal took ");
+                   formatNanos(claimRemovalNanos, timingInfo);
+                   timingInfo.append("; FlowFile Event Update took ");
+                   formatNanos(updateEventRepositoryNanos, timingInfo);
+                   timingInfo.append("; Enqueuing FlowFiles took ");
+                   formatNanos(enqueueFlowFileNanos, timingInfo);
+                   timingInfo.append("; Updating Provenance Event Repository 
took ");
+                   formatNanos(updateProvenanceNanos, timingInfo);
+       
+                   LOG.debug(timingInfo.toString());
+               }
+       } catch (final Exception e) {
+               try {
+                       rollback();
+               } catch (final Exception e1) {
+                       e.addSuppressed(e1);
+               }
+               
+               throw e;
+       }
     }
 
     private void updateEventRepository(final Checkpoint checkpoint) {
@@ -2115,13 +2125,24 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     private void resetWriteClaims() {
+       resetWriteClaims(true);
+    }
+    
+    private void resetWriteClaims(final boolean suppressExceptions) {
         try {
             if (currentWriteClaimStream != null) {
-                currentWriteClaimStream.flush();
-                currentWriteClaimStream.close();
+               try {
+                       currentWriteClaimStream.flush();
+               } finally {
+                       currentWriteClaimStream.close();
+               }
             }
-        } catch (final Exception e) {
+        } catch (final IOException e) {
+               if (!suppressExceptions) {
+                       throw new FlowFileAccessException("Unable to flush the 
output of FlowFile to the Content Repository");
+               }
         }
+
         currentWriteClaimStream = null;
         currentWriteClaim = null;
         currentWriteClaimFlowFileCount = 0;
@@ -2129,9 +2150,15 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         for (final ByteCountingOutputStream out : appendableStreams.values()) {
             try {
-                out.flush();
-                out.close();
-            } catch (final Exception e) {
+               try {
+                       out.flush();
+               } finally {
+                       out.close();
+               }
+            } catch (final IOException e) {
+               if (!suppressExceptions) {
+                       throw new FlowFileAccessException("Unable to flush the 
output of FlowFile to the Content Repository");
+               }
             }
         }
         appendableStreams.clear();

Reply via email to