Repository: nifi
Updated Branches:
  refs/heads/master 992e84102 -> 3d4ce3452


NIFI-938: If ResourceClaim is removed while a process has access to its stream, 
don't delete the claim


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3d4ce345
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3d4ce345
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3d4ce345

Branch: refs/heads/master
Commit: 3d4ce34529533ff801a2febae7e669248cb80324
Parents: 992e841
Author: Mark Payne <[email protected]>
Authored: Mon Sep 14 11:47:02 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Mon Sep 14 11:47:02 2015 -0400

----------------------------------------------------------------------
 .../repository/FileSystemRepository.java        | 11 +++++++++-
 .../repository/TestFileSystemRepository.java    | 23 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3d4ce345/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index d06b462..724e26e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -108,6 +108,7 @@ public class FileSystemRepository implements 
ContentRepository {
     // the OutputStream that we can use for writing to the claim.
     private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new 
LinkedBlockingQueue<>(100);
     private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> 
writableClaimStreams = new ConcurrentHashMap<>(100);
+    private final Set<ResourceClaim> activeResourceClaims = 
Collections.synchronizedSet(new HashSet<ResourceClaim>());
 
     private final boolean archiveData;
     private final long maxArchiveMillis;
@@ -600,11 +601,15 @@ public class FileSystemRepository implements 
ContentRepository {
         // two conditions can be checked atomically.
         synchronized (writableClaimQueue) {
             final int claimantCount = 
resourceClaimManager.getClaimantCount(claim);
-            if (claimantCount > 0 || writableClaimQueue.contains(new 
ClaimLengthPair(claim, null))) {
+            if (claimantCount > 0) {
                 // if other content claims are claiming the same resource, we 
have nothing to destroy,
                 // so just consider the destruction successful.
                 return true;
             }
+            if (activeResourceClaims.contains(claim) || 
writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
+                // If we have an open OutputStream for the claim, we will not 
destroy the claim.
+                return false;
+            }
         }
 
         Path path = null;
@@ -827,6 +832,8 @@ public class FileSystemRepository implements 
ContentRepository {
             throw new IllegalArgumentException("Cannot write to " + claim + " 
because it has already been written to.");
         }
 
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
+
         // we always append because there may be another ContentClaim using 
the same resource claim.
         // However, we know that we will never write to the same claim from 
two different threads
         // at the same time because we will call create() to get the claim 
before we write to it,
@@ -847,6 +854,7 @@ public class FileSystemRepository implements 
ContentRepository {
             }
         }
 
+        activeResourceClaims.add(resourceClaim);
         final ByteCountingOutputStream bcos = claimStream;
         final OutputStream out = new OutputStream() {
             private long bytesWritten = 0L;
@@ -921,6 +929,7 @@ public class FileSystemRepository implements 
ContentRepository {
             @Override
             public synchronized void close() throws IOException {
                 closed = true;
+                activeResourceClaims.remove(resourceClaim);
 
                 if (alwaysSync) {
                     ((FileOutputStream) 
bcos.getWrappedStream()).getFD().sync();

http://git-wip-us.apache.org/repos/asf/nifi/blob/3d4ce345/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index 5ffcb3d..88f572b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -348,6 +348,29 @@ public class TestFileSystemRepository {
     }
 
     @Test
+    public void testRemoveWhileWritingToClaim() throws IOException {
+        final ContentClaim claim = repository.create(false);
+        final OutputStream out = repository.write(claim);
+
+        // write at least 1 MB to the output stream so that when we close the 
output stream
+        // the repo won't keep the stream open.
+        final byte[] buff = new byte[1024 * 1024];
+        out.write(buff);
+        out.write(buff);
+
+        // true because claimant count is still 1.
+        assertTrue(repository.remove(claim));
+
+        assertEquals(0, repository.decrementClaimantCount(claim));
+
+        // false because claimant count is 0 but there is an 'active' stream 
for the claim
+        assertFalse(repository.remove(claim));
+
+        out.close();
+        assertTrue(repository.remove(claim));
+    }
+
+    @Test
     public void testMergeWithHeaderFooterDemarcator() throws IOException {
         testMerge("HEADER", "FOOTER", "DEMARCATOR");
     }

Reply via email to