This is an automated email from the ASF dual-hosted git repository.

mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 49b7a7c  NIFI-6846: If an Exception is thrown while a Processor is 
writing to a FlowFile, but that Content Claim is not yet eligible for 
destruction, mark it as a transient claim on the RepositoryRecord so that if 
it's available when the FlowFile Repository is checkpointed, then it will be 
cleaned up then
49b7a7c is described below

commit 49b7a7cd6b2645188a732bc57122d1757516e2bd
Author: Mark Payne <[email protected]>
AuthorDate: Tue Nov 5 13:36:45 2019 -0500

    NIFI-6846: If an Exception is thrown while a Processor is writing to a 
FlowFile, but that Content Claim is not yet eligible for destruction, mark it 
as a transient claim on the RepositoryRecord so that if it's available when the 
FlowFile Repository is checkpointed, then it will be cleaned up then
    
    This closes #3872
---
 .../repository/StandardProcessSession.java         | 51 ++++++++++++----------
 .../repository/WriteAheadFlowFileRepository.java   |  3 ++
 2 files changed, 32 insertions(+), 22 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 5684ab6..f7249c0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1116,15 +1116,22 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
      *
      * @param claim claim to destroy
      */
-    private void destroyContent(final ContentClaim claim) {
+    private void destroyContent(final ContentClaim claim, final 
StandardRepositoryRecord repoRecord) {
         if (claim == null) {
             return;
         }
 
         final int decrementedClaimCount = 
context.getContentRepository().decrementClaimantCount(claim);
+        boolean removed = false;
         if (decrementedClaimCount <= 0) {
             resetWriteClaims(); // Have to ensure that we are not currently 
writing to the claim before we can destroy it.
-            context.getContentRepository().remove(claim);
+            removed = context.getContentRepository().remove(claim);
+        }
+
+        // If we were not able to remove the content claim yet, mark it as a 
transient claim so that it will be cleaned up when the
+        // FlowFile Repository is updated if it's available for cleanup at 
that time.
+        if (!removed) {
+            repoRecord.addTransientClaim(claim);
         }
     }
 
@@ -2554,14 +2561,14 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 bytesRead += readCount;
             }
         } catch (final ContentNotFoundException nfe) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, destinationRecord);
             handleContentNotFound(nfe, destinationRecord);
             handleContentNotFound(nfe, sourceRecords);
         } catch (final IOException ioe) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, destinationRecord);
             throw new FlowFileAccessException("Failed to merge " + 
sources.size() + " into " + destination + " due to " + ioe.toString(), ioe);
         } catch (final Throwable t) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, destinationRecord);
             throw t;
         }
 
@@ -2690,20 +2697,20 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             return createTaskTerminationStream(errorHandlingOutputStream);
         } catch (final ContentNotFoundException nfe) {
             resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             handleContentNotFound(nfe, record);
             throw nfe;
         } catch (final FlowFileAccessException ffae) {
             resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw ffae;
         } catch (final IOException ioe) {
             resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw new ProcessException("IOException thrown from " + 
connectableDescription + ": " + ioe.toString(), ioe);
         } catch (final Throwable t) {
             resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw t;
         }
     }
@@ -2737,19 +2744,19 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             }
         } catch (final ContentNotFoundException nfe) {
             resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             handleContentNotFound(nfe, record);
         } catch (final FlowFileAccessException ffae) {
             resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw ffae;
         } catch (final IOException ioe) {
             resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw new ProcessException("IOException thrown from " + 
connectableDescription + ": " + ioe.toString(), ioe);
         } catch (final Throwable t) {
             resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw t;
         }
 
@@ -2835,7 +2842,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             // whenever the FlowFile is removed, the claim count will be 
decremented; if we decremented
             // it here also, we would be decrementing the claimant count twice!
             if (newClaim != oldClaim) {
-                destroyContent(newClaim);
+                destroyContent(newClaim, record);
             }
 
             handleContentNotFound(nfe, record);
@@ -2844,7 +2851,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
             // See above explanation for why this is done only if newClaim != 
oldClaim
             if (newClaim != oldClaim) {
-                destroyContent(newClaim);
+                destroyContent(newClaim, record);
             }
 
             throw new ProcessException("IOException thrown from " + 
connectableDescription + ": " + ioe.toString(), ioe);
@@ -2853,7 +2860,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
             // See above explanation for why this is done only if newClaim != 
oldClaim
             if (newClaim != oldClaim) {
-                destroyContent(newClaim);
+                destroyContent(newClaim, record);
             }
 
             throw t;
@@ -3004,16 +3011,16 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 }
             }
         } catch (final ContentNotFoundException nfe) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             handleContentNotFound(nfe, record);
         } catch (final IOException ioe) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw new ProcessException("IOException thrown from " + 
connectableDescription + ": " + ioe.toString(), ioe);
         } catch (final FlowFileAccessException ffae) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw ffae;
         } catch (final Throwable t) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw t;
         }
 
@@ -3060,7 +3067,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             bytesWritten += newSize;
             bytesRead += newSize;
         } catch (final Throwable t) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw new FlowFileAccessException("Failed to import data from " + 
source + " for " + destination + " due to " + t.toString(), t);
         }
 
@@ -3104,7 +3111,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             }
         } catch (final Throwable t) {
             if (newClaim != null) {
-                destroyContent(newClaim);
+                destroyContent(newClaim, record);
             }
 
             throw new FlowFileAccessException("Failed to import data from " + 
source + " for " + destination + " due to " + t.toString(), t);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 2ba59c7..dcd0f32 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -511,7 +511,10 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
                 swapLocationsRemoved.add(swapLocation);
                 swapLocationsAdded.remove(swapLocation);
             }
+        }
 
+        // Once the content claim counts have been updated for all records, 
collect any transient claims that are eligible for destruction
+        for (final RepositoryRecord record : repositoryRecords) {
             final List<ContentClaim> transientClaims = 
record.getTransientClaims();
             if (transientClaims != null) {
                 for (final ContentClaim transientClaim : transientClaims) {

Reply via email to