Repository: nifi Updated Branches: refs/heads/master 1df8fe44c -> e3bdee8b1
NIFI-1824: If attempting to archive content, and there are no claimant counts for it, ensure that the stream is closed. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e3bdee8b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e3bdee8b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e3bdee8b Branch: refs/heads/master Commit: e3bdee8b1ea27d3fe4f525f87912de74d1f6b68d Parents: 1df8fe4 Author: Mark Payne <marka...@hotmail.com> Authored: Thu Apr 28 15:04:23 2016 -0400 Committer: Oleg Zhurakousky <o...@suitcase.io> Committed: Tue May 3 14:27:13 2016 -0400 ---------------------------------------------------------------------- .../repository/FileSystemRepository.java | 52 +++--- .../repository/TestFileSystemRepository.java | 179 +++++++++++++++++-- .../src/test/resources/nifi.properties | 2 + 3 files changed, 196 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e3bdee8b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 497e630..ad8ff49 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -540,6 +540,15 @@ public class FileSystemRepository implements ContentRepository { resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant); resourceOffset = 0L; LOG.debug("Creating new Resource Claim {}", resourceClaim); + + // we always append because there may be another ContentClaim using the same resource claim. + // However, we know that we will never write to the same claim from two different threads + // at the same time because we will call create() to get the claim before we write to it, + // and when we call create(), it will remove it from the Queue, which means that no other + // thread will get the same Claim until we've finished writing to it. + final File file = getPath(resourceClaim).toFile(); + ByteCountingOutputStream claimStream = new SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), file.length()); + writableClaimStreams.put(resourceClaim, claimStream); } else { resourceClaim = pair.getClaim(); resourceOffset = pair.getLength(); @@ -841,25 +850,8 @@ public class FileSystemRepository implements ContentRepository { final ResourceClaim resourceClaim = claim.getResourceClaim(); - // we always append because there may be another ContentClaim using the same resource claim. - // However, we know that we will never write to the same claim from two different threads - // at the same time because we will call create() to get the claim before we write to it, - // and when we call create(), it will remove it from the Queue, which means that no other - // thread will get the same Claim until we've finished writing to it. - ByteCountingOutputStream claimStream = writableClaimStreams.remove(scc.getResourceClaim()); - final long initialLength; - if (claimStream == null) { - final File file = getPath(scc).toFile(); - // use a synchronized stream because we want to pass this OutputStream out from one thread to another. - claimStream = new SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), file.length()); - initialLength = 0L; - } else { - if (append) { - initialLength = Math.max(0, scc.getLength()); - } else { - initialLength = 0; - } - } + ByteCountingOutputStream claimStream = writableClaimStreams.get(scc.getResourceClaim()); + final int initialLength = append ? (int) Math.max(0, scc.getLength()) : 0; activeResourceClaims.add(resourceClaim); final ByteCountingOutputStream bcos = claimStream; @@ -963,9 +955,9 @@ public class FileSystemRepository implements ContentRepository { final boolean enqueued = writableClaimQueue.offer(pair); if (enqueued) { - writableClaimStreams.put(scc.getResourceClaim(), bcos); LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this); } else { + writableClaimStreams.remove(scc.getResourceClaim()); bcos.close(); LOG.debug("Claim length less than max; Closing {} because could not add back to queue", this); @@ -1114,6 +1106,19 @@ public class FileSystemRepository implements ContentRepository { } } + // If the claim count is decremented to 0 (<= 0 as a 'defensive programming' strategy), ensure that + // we close the stream if there is one. There may be a stream open if create() is called and then + // claimant count is removed without writing to the claim (or more specifically, without closing the + // OutputStream that is returned when calling write() ). + final OutputStream out = writableClaimStreams.remove(claim); + if (out != null) { + try { + out.close(); + } catch (final IOException ioe) { + LOG.warn("Unable to close Output Stream for " + claim, ioe); + } + } + final Path curPath = getPath(claim); if (curPath == null) { return false; @@ -1124,7 +1129,12 @@ public class FileSystemRepository implements ContentRepository { return archived; } - private boolean archive(final Path curPath) throws IOException { + protected int getOpenStreamCount() { + return writableClaimStreams.size(); + } + + // marked protected for visibility and ability to override for unit tests. + protected boolean archive(final Path curPath) throws IOException { // check if already archived final boolean alreadyArchived = ARCHIVE_DIR_NAME.equals(curPath.getParent().toFile().getName()); if (alreadyArchived) { http://git-wip-us.apache.org/repos/asf/nifi/blob/e3bdee8b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index c40d0e3..ed792a4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -35,9 +35,13 @@ import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.controller.repository.claim.StandardResourceClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.util.DiskUtils; import org.apache.nifi.stream.io.StreamUtils; @@ -59,6 +63,7 @@ public class TestFileSystemRepository { public static final File helloWorldFile = new File("src/test/resources/hello.txt"); private FileSystemRepository repository = null; + private StandardResourceClaimManager claimManager = null; private final File rootFile = new File("target/content_repository"); @Before @@ -68,7 +73,8 @@ public class TestFileSystemRepository { DiskUtils.deleteRecursively(rootFile); } repository = new FileSystemRepository(); - repository.initialize(new StandardResourceClaimManager()); + claimManager = new StandardResourceClaimManager(); + repository.initialize(claimManager); repository.purge(); } @@ -79,30 +85,45 @@ public class TestFileSystemRepository { @Test public void testMinimalArchiveCleanupIntervalHonoredAndLogged() throws Exception { + // We are going to construct our own repository using different properties, so + // we need to shutdown the existing one. + shutdown(); + Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); ListAppender<ILoggingEvent> testAppender = new ListAppender<>(); testAppender.setName("Test"); testAppender.start(); root.addAppender(testAppender); - NiFiProperties.getInstance().setProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 millis"); - repository = new FileSystemRepository(); - repository.initialize(new StandardResourceClaimManager()); - repository.purge(); + + final NiFiProperties properties = NiFiProperties.getInstance(); + final String originalCleanupFreq = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY); + properties.setProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 millis"); + try { + repository = new FileSystemRepository(); + repository.initialize(new StandardResourceClaimManager()); + repository.purge(); - boolean messageFound = false; - String message = "The value of nifi.content.repository.archive.cleanup.frequency property " + boolean messageFound = false; + String message = "The value of nifi.content.repository.archive.cleanup.frequency property " + "is set to '1 millis' which is below the allowed minimum of 1 second (1000 milliseconds). " + "Minimum value of 1 sec will be used as scheduling interval for archive cleanup task."; - for (ILoggingEvent event : testAppender.list) { - String actualMessage = event.getFormattedMessage(); - if (actualMessage.equals(message)) { - assertEquals(event.getLevel(), Level.WARN); - messageFound = true; - break; + for (ILoggingEvent event : testAppender.list) { + String actualMessage = event.getFormattedMessage(); + if (actualMessage.equals(message)) { + assertEquals(event.getLevel(), Level.WARN); + messageFound = true; + break; + } + } + assertTrue(messageFound); + } finally { + if (originalCleanupFreq == null) { + properties.remove(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY); + } else { + properties.setProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, originalCleanupFreq); } } - assertTrue(messageFound); } @Test @@ -357,13 +378,13 @@ public class TestFileSystemRepository { @Test(expected = ContentNotFoundException.class) public void testSizeWithNoContent() throws IOException { - final ContentClaim claim = repository.create(true); + final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim("container1", "section 1", "1", false), 0L); assertEquals(0L, repository.size(claim)); } @Test(expected = ContentNotFoundException.class) public void testReadWithNoContent() throws IOException { - final ContentClaim claim = repository.create(true); + final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim("container1", "section 1", "1", false), 0L); final InputStream in = repository.read(claim); in.close(); } @@ -422,6 +443,132 @@ public class TestFileSystemRepository { } @Test + public void testMarkDestructableDoesNotArchiveIfStreamOpenAndWrittenTo() throws IOException, InterruptedException { + FileSystemRepository repository = null; + try { + final List<Path> archivedPaths = Collections.synchronizedList(new ArrayList<Path>()); + + // We are creating our own 'local' repository in this test so shut down the one created in the setup() method + shutdown(); + + repository = new FileSystemRepository() { + @Override + protected boolean archive(Path curPath) throws IOException { + archivedPaths.add(curPath); + return true; + } + }; + + final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); + repository.initialize(claimManager); + repository.purge(); + + final ContentClaim claim = repository.create(false); + + // Create a stream and write a bit to it, then close it. This will cause the + // claim to be put back onto the 'writableClaimsQueue' + try (final OutputStream out = repository.write(claim)) { + assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim())); + out.write("1\n".getBytes()); + } + + assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim())); + + int claimantCount = claimManager.decrementClaimantCount(claim.getResourceClaim()); + assertEquals(0, claimantCount); + assertTrue(archivedPaths.isEmpty()); + + claimManager.markDestructable(claim.getResourceClaim()); + + // Wait for the archive thread to have a chance to run + Thread.sleep(2000L); + + // Should still be empty because we have a stream open to the file. + assertTrue(archivedPaths.isEmpty()); + assertEquals(0, claimManager.getClaimantCount(claim.getResourceClaim())); + } finally { + if (repository != null) { + repository.shutdown(); + } + } + } + + + /** + * We have encountered a situation where the File System Repo is moving files to archive and then eventually + * aging them off while there is still an open file handle. This test is meant to replicate the conditions under + * which this would happen and verify that it is fixed. + * + * The condition that caused this appears to be that a Process Session created a Content Claim and then did not write + * to it. It then decremented the claimant count (which reduced the count to 0). This was likely due to creating the + * claim in ProcessSession.write(FlowFile, StreamCallback) and then having an Exception thrown when the Process Session + * attempts to read the current Content Claim. In this case, it would not ever get to the point of calling + * FileSystemRepository.write(). + * + * The above sequence of events is problematic because calling FileSystemRepository.create() will remove the Resource Claim + * from the 'writable claims queue' and expects that we will write to it. When we call FileSystemRepository.write() with that + * Resource Claim, we return an OutputStream that, when closed, will take care of adding the Resource Claim back to the + * 'writable claims queue' or otherwise close the FileOutputStream that is open for that Resource Claim. If FileSystemRepository.write() + * is never called, or if the OutputStream returned by that method is never closed, but the Content Claim is then decremented to 0, + * we can get into a situation where we do archive the content (because the claimant count is 0 and it is not in the 'writable claims queue') + * and then eventually age it off, without ever closing the OutputStream. We need to ensure that we do always close that Output Stream. + */ + @Test + public void testMarkDestructableDoesNotArchiveIfStreamOpenAndNotWrittenTo() throws IOException, InterruptedException { + FileSystemRepository repository = null; + try { + final List<Path> archivedPathsWithOpenStream = Collections.synchronizedList(new ArrayList<Path>()); + + // We are creating our own 'local' repository in this test so shut down the one created in the setup() method + shutdown(); + + repository = new FileSystemRepository() { + @Override + protected boolean archive(Path curPath) throws IOException { + if (getOpenStreamCount() > 0) { + archivedPathsWithOpenStream.add(curPath); + } + + return true; + } + }; + + final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); + repository.initialize(claimManager); + repository.purge(); + + final ContentClaim claim = repository.create(false); + + assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim())); + + int claimantCount = claimManager.decrementClaimantCount(claim.getResourceClaim()); + assertEquals(0, claimantCount); + assertTrue(archivedPathsWithOpenStream.isEmpty()); + + // This would happen when FlowFile repo is checkpointed, if Resource Claim has claimant count of 0. + // Since the Resource Claim of interest is still 'writable', we should not archive it. + claimManager.markDestructable(claim.getResourceClaim()); + + // Wait for the archive thread to have a chance to run + long totalSleepMillis = 0; + final long startTime = System.nanoTime(); + while (archivedPathsWithOpenStream.isEmpty() && totalSleepMillis < 5000) { + Thread.sleep(100L); + totalSleepMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + } + + // Should still be empty because we have a stream open to the file so we should + // not actually try to archive the data. + assertTrue(archivedPathsWithOpenStream.isEmpty()); + assertEquals(0, claimManager.getClaimantCount(claim.getResourceClaim())); + } finally { + if (repository != null) { + repository.shutdown(); + } + } + } + + @Test public void testMergeWithHeaderFooterDemarcator() throws IOException { testMerge("HEADER", "FOOTER", "DEMARCATOR"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e3bdee8b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties index 210e7c6..314304f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties @@ -49,6 +49,8 @@ nifi.swap.out.threads=4 nifi.content.claim.max.appendable.size=10 MB nifi.content.claim.max.flow.files=100 nifi.content.repository.directory.default=./target/content_repository +nifi.content.repository.archive.enabled=true +nifi.content.repository.archive.max.usage.percentage=95% # Provenance Repository Properties nifi.provenance.repository.storage.directory=./target/provenance_repository