This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 49b7a7c NIFI-6846: If an Exception is thrown while a Processor is
writing to a FlowFile, but that Content Claim is not yet eligible for
destruction, mark it as a transient claim on the RepositoryRecord so that if
it's available when the FlowFile Repository is checkpointed, then it will be
cleaned up then
49b7a7c is described below
commit 49b7a7cd6b2645188a732bc57122d1757516e2bd
Author: Mark Payne <[email protected]>
AuthorDate: Tue Nov 5 13:36:45 2019 -0500
NIFI-6846: If an Exception is thrown while a Processor is writing to a
FlowFile, but that Content Claim is not yet eligible for destruction, mark it
as a transient claim on the RepositoryRecord so that if it's available when the
FlowFile Repository is checkpointed, then it will be cleaned up then
This closes #3872
---
.../repository/StandardProcessSession.java | 51 ++++++++++++----------
.../repository/WriteAheadFlowFileRepository.java | 3 ++
2 files changed, 32 insertions(+), 22 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 5684ab6..f7249c0 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1116,15 +1116,22 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
*
* @param claim claim to destroy
*/
- private void destroyContent(final ContentClaim claim) {
+ private void destroyContent(final ContentClaim claim, final
StandardRepositoryRecord repoRecord) {
if (claim == null) {
return;
}
final int decrementedClaimCount =
context.getContentRepository().decrementClaimantCount(claim);
+ boolean removed = false;
if (decrementedClaimCount <= 0) {
resetWriteClaims(); // Have to ensure that we are not currently
writing to the claim before we can destroy it.
- context.getContentRepository().remove(claim);
+ removed = context.getContentRepository().remove(claim);
+ }
+
+ // If we were not able to remove the content claim yet, mark it as a
transient claim so that it will be cleaned up when the
+ // FlowFile Repository is updated if it's available for cleanup at
that time.
+ if (!removed) {
+ repoRecord.addTransientClaim(claim);
}
}
@@ -2554,14 +2561,14 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
bytesRead += readCount;
}
} catch (final ContentNotFoundException nfe) {
- destroyContent(newClaim);
+ destroyContent(newClaim, destinationRecord);
handleContentNotFound(nfe, destinationRecord);
handleContentNotFound(nfe, sourceRecords);
} catch (final IOException ioe) {
- destroyContent(newClaim);
+ destroyContent(newClaim, destinationRecord);
throw new FlowFileAccessException("Failed to merge " +
sources.size() + " into " + destination + " due to " + ioe.toString(), ioe);
} catch (final Throwable t) {
- destroyContent(newClaim);
+ destroyContent(newClaim, destinationRecord);
throw t;
}
@@ -2690,20 +2697,20 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
return createTaskTerminationStream(errorHandlingOutputStream);
} catch (final ContentNotFoundException nfe) {
resetWriteClaims(); // need to reset write claim before we can
remove the claim
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
handleContentNotFound(nfe, record);
throw nfe;
} catch (final FlowFileAccessException ffae) {
resetWriteClaims(); // need to reset write claim before we can
remove the claim
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
throw ffae;
} catch (final IOException ioe) {
resetWriteClaims(); // need to reset write claim before we can
remove the claim
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
throw new ProcessException("IOException thrown from " +
connectableDescription + ": " + ioe.toString(), ioe);
} catch (final Throwable t) {
resetWriteClaims(); // need to reset write claim before we can
remove the claim
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
throw t;
}
}
@@ -2737,19 +2744,19 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
}
} catch (final ContentNotFoundException nfe) {
resetWriteClaims(); // need to reset write claim before we can
remove the claim
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
handleContentNotFound(nfe, record);
} catch (final FlowFileAccessException ffae) {
resetWriteClaims(); // need to reset write claim before we can
remove the claim
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
throw ffae;
} catch (final IOException ioe) {
resetWriteClaims(); // need to reset write claim before we can
remove the claim
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
throw new ProcessException("IOException thrown from " +
connectableDescription + ": " + ioe.toString(), ioe);
} catch (final Throwable t) {
resetWriteClaims(); // need to reset write claim before we can
remove the claim
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
throw t;
}
@@ -2835,7 +2842,7 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
// whenever the FlowFile is removed, the claim count will be
decremented; if we decremented
// it here also, we would be decrementing the claimant count twice!
if (newClaim != oldClaim) {
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
}
handleContentNotFound(nfe, record);
@@ -2844,7 +2851,7 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
// See above explanation for why this is done only if newClaim !=
oldClaim
if (newClaim != oldClaim) {
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
}
throw new ProcessException("IOException thrown from " +
connectableDescription + ": " + ioe.toString(), ioe);
@@ -2853,7 +2860,7 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
// See above explanation for why this is done only if newClaim !=
oldClaim
if (newClaim != oldClaim) {
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
}
throw t;
@@ -3004,16 +3011,16 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
}
}
} catch (final ContentNotFoundException nfe) {
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
handleContentNotFound(nfe, record);
} catch (final IOException ioe) {
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
throw new ProcessException("IOException thrown from " +
connectableDescription + ": " + ioe.toString(), ioe);
} catch (final FlowFileAccessException ffae) {
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
throw ffae;
} catch (final Throwable t) {
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
throw t;
}
@@ -3060,7 +3067,7 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
bytesWritten += newSize;
bytesRead += newSize;
} catch (final Throwable t) {
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
throw new FlowFileAccessException("Failed to import data from " +
source + " for " + destination + " due to " + t.toString(), t);
}
@@ -3104,7 +3111,7 @@ public final class StandardProcessSession implements
ProcessSession, ProvenanceE
}
} catch (final Throwable t) {
if (newClaim != null) {
- destroyContent(newClaim);
+ destroyContent(newClaim, record);
}
throw new FlowFileAccessException("Failed to import data from " +
source + " for " + destination + " due to " + t.toString(), t);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 2ba59c7..dcd0f32 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -511,7 +511,10 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
swapLocationsRemoved.add(swapLocation);
swapLocationsAdded.remove(swapLocation);
}
+ }
+ // Once the content claim counts have been updated for all records,
collect any transient claims that are eligible for destruction
+ for (final RepositoryRecord record : repositoryRecords) {
final List<ContentClaim> transientClaims =
record.getTransientClaims();
if (transientClaims != null) {
for (final ContentClaim transientClaim : transientClaims) {