Repository: nifi Updated Branches: refs/heads/master 992e84102 -> 3d4ce3452
NIFI-938: If ResourceClaim is removed while a process has access to its stream, don't delete the claim Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3d4ce345 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3d4ce345 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3d4ce345 Branch: refs/heads/master Commit: 3d4ce34529533ff801a2febae7e669248cb80324 Parents: 992e841 Author: Mark Payne <[email protected]> Authored: Mon Sep 14 11:47:02 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Mon Sep 14 11:47:02 2015 -0400 ---------------------------------------------------------------------- .../repository/FileSystemRepository.java | 11 +++++++++- .../repository/TestFileSystemRepository.java | 23 ++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/3d4ce345/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index d06b462..724e26e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -108,6 +108,7 @@ public class FileSystemRepository implements ContentRepository { // the OutputStream that we can use for writing to the claim. private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100); private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100); + private final Set<ResourceClaim> activeResourceClaims = Collections.synchronizedSet(new HashSet<ResourceClaim>()); private final boolean archiveData; private final long maxArchiveMillis; @@ -600,11 +601,15 @@ public class FileSystemRepository implements ContentRepository { // two conditions can be checked atomically. synchronized (writableClaimQueue) { final int claimantCount = resourceClaimManager.getClaimantCount(claim); - if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) { + if (claimantCount > 0) { // if other content claims are claiming the same resource, we have nothing to destroy, // so just consider the destruction successful. return true; } + if (activeResourceClaims.contains(claim) || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) { + // If we have an open OutputStream for the claim, we will not destroy the claim. + return false; + } } Path path = null; @@ -827,6 +832,8 @@ public class FileSystemRepository implements ContentRepository { throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to."); } + final ResourceClaim resourceClaim = claim.getResourceClaim(); + // we always append because there may be another ContentClaim using the same resource claim. // However, we know that we will never write to the same claim from two different threads // at the same time because we will call create() to get the claim before we write to it, @@ -847,6 +854,7 @@ public class FileSystemRepository implements ContentRepository { } } + activeResourceClaims.add(resourceClaim); final ByteCountingOutputStream bcos = claimStream; final OutputStream out = new OutputStream() { private long bytesWritten = 0L; @@ -921,6 +929,7 @@ public class FileSystemRepository implements ContentRepository { @Override public synchronized void close() throws IOException { closed = true; + activeResourceClaims.remove(resourceClaim); if (alwaysSync) { ((FileOutputStream) bcos.getWrappedStream()).getFD().sync(); http://git-wip-us.apache.org/repos/asf/nifi/blob/3d4ce345/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index 5ffcb3d..88f572b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -348,6 +348,29 @@ public class TestFileSystemRepository { } @Test + public void testRemoveWhileWritingToClaim() throws IOException { + final ContentClaim claim = repository.create(false); + final OutputStream out = repository.write(claim); + + // write at least 1 MB to the output stream so that when we close the output stream + // the repo won't keep the stream open. + final byte[] buff = new byte[1024 * 1024]; + out.write(buff); + out.write(buff); + + // true because claimant count is still 1. + assertTrue(repository.remove(claim)); + + assertEquals(0, repository.decrementClaimantCount(claim)); + + // false because claimant count is 0 but there is an 'active' stream for the claim + assertFalse(repository.remove(claim)); + + out.close(); + assertTrue(repository.remove(claim)); + } + + @Test public void testMergeWithHeaderFooterDemarcator() throws IOException { testMerge("HEADER", "FOOTER", "DEMARCATOR"); }
