Repository: nifi
Updated Branches:
  refs/heads/master 1df8fe44c -> e3bdee8b1


NIFI-1824: If attempting to archive content, and there are no claimant counts 
for it, ensure that the stream is closed.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e3bdee8b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e3bdee8b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e3bdee8b

Branch: refs/heads/master
Commit: e3bdee8b1ea27d3fe4f525f87912de74d1f6b68d
Parents: 1df8fe4
Author: Mark Payne <marka...@hotmail.com>
Authored: Thu Apr 28 15:04:23 2016 -0400
Committer: Oleg Zhurakousky <o...@suitcase.io>
Committed: Tue May 3 14:27:13 2016 -0400

----------------------------------------------------------------------
 .../repository/FileSystemRepository.java        |  52 +++---
 .../repository/TestFileSystemRepository.java    | 179 +++++++++++++++++--
 .../src/test/resources/nifi.properties          |   2 +
 3 files changed, 196 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e3bdee8b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 497e630..ad8ff49 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -540,6 +540,15 @@ public class FileSystemRepository implements 
ContentRepository {
                 resourceClaim = 
resourceClaimManager.newResourceClaim(containerName, section, claimId, 
lossTolerant);
                 resourceOffset = 0L;
                 LOG.debug("Creating new Resource Claim {}", resourceClaim);
+
+                // we always append because there may be another ContentClaim 
using the same resource claim.
+                // However, we know that we will never write to the same claim 
from two different threads
+                // at the same time because we will call create() to get the 
claim before we write to it,
+                // and when we call create(), it will remove it from the 
Queue, which means that no other
+                // thread will get the same Claim until we've finished writing 
to it.
+                final File file = getPath(resourceClaim).toFile();
+                ByteCountingOutputStream claimStream = new 
SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), 
file.length());
+                writableClaimStreams.put(resourceClaim, claimStream);
             } else {
                 resourceClaim = pair.getClaim();
                 resourceOffset = pair.getLength();
@@ -841,25 +850,8 @@ public class FileSystemRepository implements 
ContentRepository {
 
         final ResourceClaim resourceClaim = claim.getResourceClaim();
 
-        // we always append because there may be another ContentClaim using 
the same resource claim.
-        // However, we know that we will never write to the same claim from 
two different threads
-        // at the same time because we will call create() to get the claim 
before we write to it,
-        // and when we call create(), it will remove it from the Queue, which 
means that no other
-        // thread will get the same Claim until we've finished writing to it.
-        ByteCountingOutputStream claimStream = 
writableClaimStreams.remove(scc.getResourceClaim());
-        final long initialLength;
-        if (claimStream == null) {
-            final File file = getPath(scc).toFile();
-            // use a synchronized stream because we want to pass this 
OutputStream out from one thread to another.
-            claimStream = new SynchronizedByteCountingOutputStream(new 
FileOutputStream(file, true), file.length());
-            initialLength = 0L;
-        } else {
-            if (append) {
-                initialLength = Math.max(0, scc.getLength());
-            } else {
-                initialLength = 0;
-            }
-        }
+        ByteCountingOutputStream claimStream = 
writableClaimStreams.get(scc.getResourceClaim());
+        final int initialLength = append ? (int) Math.max(0, scc.getLength()) 
: 0;
 
         activeResourceClaims.add(resourceClaim);
         final ByteCountingOutputStream bcos = claimStream;
@@ -963,9 +955,9 @@ public class FileSystemRepository implements 
ContentRepository {
                     final boolean enqueued = writableClaimQueue.offer(pair);
 
                     if (enqueued) {
-                        writableClaimStreams.put(scc.getResourceClaim(), bcos);
                         LOG.debug("Claim length less than max; Adding {} back 
to writableClaimStreams", this);
                     } else {
+                        writableClaimStreams.remove(scc.getResourceClaim());
                         bcos.close();
 
                         LOG.debug("Claim length less than max; Closing {} 
because could not add back to queue", this);
@@ -1114,6 +1106,19 @@ public class FileSystemRepository implements 
ContentRepository {
             }
         }
 
+        // If the claim count is decremented to 0 (<= 0 as a 'defensive 
programming' strategy), ensure that
+        // we close the stream if there is one. There may be a stream open if 
create() is called and then
+        // claimant count is removed without writing to the claim (or more 
specifically, without closing the
+        // OutputStream that is returned when calling write() ).
+        final OutputStream out = writableClaimStreams.remove(claim);
+        if (out != null) {
+            try {
+                out.close();
+            } catch (final IOException ioe) {
+                LOG.warn("Unable to close Output Stream for " + claim, ioe);
+            }
+        }
+
         final Path curPath = getPath(claim);
         if (curPath == null) {
             return false;
@@ -1124,7 +1129,12 @@ public class FileSystemRepository implements 
ContentRepository {
         return archived;
     }
 
-    private boolean archive(final Path curPath) throws IOException {
+    protected int getOpenStreamCount() {
+        return writableClaimStreams.size();
+    }
+
+    // marked protected for visibility and ability to override for unit tests.
+    protected boolean archive(final Path curPath) throws IOException {
         // check if already archived
         final boolean alreadyArchived = 
ARCHIVE_DIR_NAME.equals(curPath.getParent().toFile().getName());
         if (alreadyArchived) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3bdee8b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index c40d0e3..ed792a4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -35,9 +35,13 @@ import java.nio.file.StandardCopyOption;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
 import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.util.DiskUtils;
 import org.apache.nifi.stream.io.StreamUtils;
@@ -59,6 +63,7 @@ public class TestFileSystemRepository {
     public static final File helloWorldFile = new 
File("src/test/resources/hello.txt");
 
     private FileSystemRepository repository = null;
+    private StandardResourceClaimManager claimManager = null;
     private final File rootFile = new File("target/content_repository");
 
     @Before
@@ -68,7 +73,8 @@ public class TestFileSystemRepository {
             DiskUtils.deleteRecursively(rootFile);
         }
         repository = new FileSystemRepository();
-        repository.initialize(new StandardResourceClaimManager());
+        claimManager = new StandardResourceClaimManager();
+        repository.initialize(claimManager);
         repository.purge();
     }
 
@@ -79,30 +85,45 @@ public class TestFileSystemRepository {
 
     @Test
     public void testMinimalArchiveCleanupIntervalHonoredAndLogged() throws 
Exception {
+        // We are going to construct our own repository using different 
properties, so
+        // we need to shutdown the existing one.
+        shutdown();
+
         Logger root = (Logger) 
LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
         ListAppender<ILoggingEvent> testAppender = new ListAppender<>();
         testAppender.setName("Test");
         testAppender.start();
         root.addAppender(testAppender);
-        
NiFiProperties.getInstance().setProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY,
 "1 millis");
-        repository = new FileSystemRepository();
-        repository.initialize(new StandardResourceClaimManager());
-        repository.purge();
+
+        final NiFiProperties properties = NiFiProperties.getInstance();
+        final String originalCleanupFreq = 
properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
+        
properties.setProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 
millis");
+        try {
+            repository = new FileSystemRepository();
+            repository.initialize(new StandardResourceClaimManager());
+            repository.purge();
 
 
-        boolean messageFound = false;
-        String message = "The value of 
nifi.content.repository.archive.cleanup.frequency property "
+            boolean messageFound = false;
+            String message = "The value of 
nifi.content.repository.archive.cleanup.frequency property "
                 + "is set to '1 millis' which is below the allowed minimum of 
1 second (1000 milliseconds). "
                 + "Minimum value of 1 sec will be used as scheduling interval 
for archive cleanup task.";
-        for (ILoggingEvent event : testAppender.list) {
-            String actualMessage = event.getFormattedMessage();
-            if (actualMessage.equals(message)) {
-                assertEquals(event.getLevel(), Level.WARN);
-                messageFound = true;
-                break;
+            for (ILoggingEvent event : testAppender.list) {
+                String actualMessage = event.getFormattedMessage();
+                if (actualMessage.equals(message)) {
+                    assertEquals(event.getLevel(), Level.WARN);
+                    messageFound = true;
+                    break;
+                }
+            }
+            assertTrue(messageFound);
+        } finally {
+            if (originalCleanupFreq == null) {
+                
properties.remove(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
+            } else {
+                
properties.setProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, 
originalCleanupFreq);
             }
         }
-        assertTrue(messageFound);
     }
 
     @Test
@@ -357,13 +378,13 @@ public class TestFileSystemRepository {
 
     @Test(expected = ContentNotFoundException.class)
     public void testSizeWithNoContent() throws IOException {
-        final ContentClaim claim = repository.create(true);
+        final ContentClaim claim = new StandardContentClaim(new 
StandardResourceClaim("container1", "section 1", "1", false), 0L);
         assertEquals(0L, repository.size(claim));
     }
 
     @Test(expected = ContentNotFoundException.class)
     public void testReadWithNoContent() throws IOException {
-        final ContentClaim claim = repository.create(true);
+        final ContentClaim claim = new StandardContentClaim(new 
StandardResourceClaim("container1", "section 1", "1", false), 0L);
         final InputStream in = repository.read(claim);
         in.close();
     }
@@ -422,6 +443,132 @@ public class TestFileSystemRepository {
     }
 
     @Test
+    public void testMarkDestructableDoesNotArchiveIfStreamOpenAndWrittenTo() 
throws IOException, InterruptedException {
+        FileSystemRepository repository = null;
+        try {
+            final List<Path> archivedPaths = Collections.synchronizedList(new 
ArrayList<Path>());
+
+            // We are creating our own 'local' repository in this test so shut 
down the one created in the setup() method
+            shutdown();
+
+            repository = new FileSystemRepository() {
+                @Override
+                protected boolean archive(Path curPath) throws IOException {
+                    archivedPaths.add(curPath);
+                    return true;
+                }
+            };
+
+            final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+            repository.initialize(claimManager);
+            repository.purge();
+
+            final ContentClaim claim = repository.create(false);
+
+            // Create a stream and write a bit to it, then close it. This will 
cause the
+            // claim to be put back onto the 'writableClaimsQueue'
+            try (final OutputStream out = repository.write(claim)) {
+                assertEquals(1, 
claimManager.getClaimantCount(claim.getResourceClaim()));
+                out.write("1\n".getBytes());
+            }
+
+            assertEquals(1, 
claimManager.getClaimantCount(claim.getResourceClaim()));
+
+            int claimantCount = 
claimManager.decrementClaimantCount(claim.getResourceClaim());
+            assertEquals(0, claimantCount);
+            assertTrue(archivedPaths.isEmpty());
+
+            claimManager.markDestructable(claim.getResourceClaim());
+
+            // Wait for the archive thread to have a chance to run
+            Thread.sleep(2000L);
+
+            // Should still be empty because we have a stream open to the file.
+            assertTrue(archivedPaths.isEmpty());
+            assertEquals(0, 
claimManager.getClaimantCount(claim.getResourceClaim()));
+        } finally {
+            if (repository != null) {
+                repository.shutdown();
+            }
+        }
+    }
+
+
+    /**
+     * We have encountered a situation where the File System Repo is moving 
files to archive and then eventually
+     * aging them off while there is still an open file handle. This test is 
meant to replicate the conditions under
+     * which this would happen and verify that it is fixed.
+     *
+     * The condition that caused this appears to be that a Process Session 
created a Content Claim and then did not write
+     * to it. It then decremented the claimant count (which reduced the count 
to 0). This was likely due to creating the
+     * claim in ProcessSession.write(FlowFile, StreamCallback) and then having 
an Exception thrown when the Process Session
+     * attempts to read the current Content Claim. In this case, it would not 
ever get to the point of calling
+     * FileSystemRepository.write().
+     *
+     * The above sequence of events is problematic because calling 
FileSystemRepository.create() will remove the Resource Claim
+     * from the 'writable claims queue' and expects that we will write to it. 
When we call FileSystemRepository.write() with that
+     * Resource Claim, we return an OutputStream that, when closed, will take 
care of adding the Resource Claim back to the
+     * 'writable claims queue' or otherwise close the FileOutputStream that is 
open for that Resource Claim. If FileSystemRepository.write()
+     * is never called, or if the OutputStream returned by that method is 
never closed, but the Content Claim is then decremented to 0,
+     * we can get into a situation where we do archive the content (because 
the claimant count is 0 and it is not in the 'writable claims queue')
+     * and then eventually age it off, without ever closing the OutputStream. 
We need to ensure that we do always close that Output Stream.
+     */
+    @Test
+    public void 
testMarkDestructableDoesNotArchiveIfStreamOpenAndNotWrittenTo() throws 
IOException, InterruptedException {
+        FileSystemRepository repository = null;
+        try {
+            final List<Path> archivedPathsWithOpenStream = 
Collections.synchronizedList(new ArrayList<Path>());
+
+            // We are creating our own 'local' repository in this test so shut 
down the one created in the setup() method
+            shutdown();
+
+            repository = new FileSystemRepository() {
+                @Override
+                protected boolean archive(Path curPath) throws IOException {
+                    if (getOpenStreamCount() > 0) {
+                        archivedPathsWithOpenStream.add(curPath);
+                    }
+
+                    return true;
+                }
+            };
+
+            final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+            repository.initialize(claimManager);
+            repository.purge();
+
+            final ContentClaim claim = repository.create(false);
+
+            assertEquals(1, 
claimManager.getClaimantCount(claim.getResourceClaim()));
+
+            int claimantCount = 
claimManager.decrementClaimantCount(claim.getResourceClaim());
+            assertEquals(0, claimantCount);
+            assertTrue(archivedPathsWithOpenStream.isEmpty());
+
+            // This would happen when FlowFile repo is checkpointed, if 
Resource Claim has claimant count of 0.
+            // Since the Resource Claim of interest is still 'writable', we 
should not archive it.
+            claimManager.markDestructable(claim.getResourceClaim());
+
+            // Wait for the archive thread to have a chance to run
+            long totalSleepMillis = 0;
+            final long startTime = System.nanoTime();
+            while (archivedPathsWithOpenStream.isEmpty() && totalSleepMillis < 
5000) {
+                Thread.sleep(100L);
+                totalSleepMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+            }
+
+            // Should still be empty because we have a stream open to the file 
so we should
+            // not actually try to archive the data.
+            assertTrue(archivedPathsWithOpenStream.isEmpty());
+            assertEquals(0, 
claimManager.getClaimantCount(claim.getResourceClaim()));
+        } finally {
+            if (repository != null) {
+                repository.shutdown();
+            }
+        }
+    }
+
+    @Test
     public void testMergeWithHeaderFooterDemarcator() throws IOException {
         testMerge("HEADER", "FOOTER", "DEMARCATOR");
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3bdee8b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties
index 210e7c6..314304f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties
@@ -49,6 +49,8 @@ nifi.swap.out.threads=4
 nifi.content.claim.max.appendable.size=10 MB
 nifi.content.claim.max.flow.files=100
 nifi.content.repository.directory.default=./target/content_repository
+nifi.content.repository.archive.enabled=true
+nifi.content.repository.archive.max.usage.percentage=95%
 
 # Provenance Repository Properties
 nifi.provenance.repository.storage.directory=./target/provenance_repository

Reply via email to