Repository: nifi Updated Branches: refs/heads/master d421e3c24 -> 4f6c1cfff
NIFI-744: Addressed feedback from review, mostly adding documentation to a few points in the code Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/15a8699d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/15a8699d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/15a8699d Branch: refs/heads/master Commit: 15a8699dc49d6d5ac4a450f3eaea52def7addb63 Parents: 68d94cc Author: Mark Payne <[email protected]> Authored: Fri Aug 21 10:52:40 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Fri Aug 21 11:08:34 2015 -0400 ---------------------------------------------------------------------- .../repository/ContentRepository.java | 34 -------------- .../apache/nifi/controller/FlowController.java | 10 ++++ .../repository/FileSystemRepository.java | 48 +++++++++++++------- .../repository/VolatileContentRepository.java | 20 ++------ .../controller/TestFileSystemSwapManager.java | 1 - .../repository/TestStandardProcessSession.java | 16 ------- 6 files changed, 46 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/15a8699d/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java index 8d0bdb3..b1ea87c 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java @@ -165,24 +165,6 @@ public interface ContentRepository { long importFrom(Path content, ContentClaim claim) throws IOException; /** - * Imports content from the given path to the specified claim, appending or - * replacing the current claim, according to the value of the append - * argument - * - * @return the size of the claim - * @param content to import from - * @param claim the claim to write imported content to - * @param append if true, the content will be appended to the claim; if - * false, the content will replace the contents of the claim - * @throws IOException if unable to read content - * - * @deprecated if needing to append to a content claim, the contents of the claim should be - * copied to a new claim and then the data to append should be written to that new claim. - */ - @Deprecated - long importFrom(Path content, ContentClaim claim, boolean append) throws IOException; - - /** * Imports content from the given stream creating a new content object and * claim within the repository. * @@ -194,22 +176,6 @@ public interface ContentRepository { long importFrom(InputStream content, ContentClaim claim) throws IOException; /** - * Imports content from the given stream, appending or replacing the current - * claim, according to the value of the appen dargument - * - * @param content to import from - * @param claim to write to - * @param append whether to append or replace - * @return length of data imported in bytes - * @throws IOException if failure to read or write stream - * - * @deprecated if needing to append to a content claim, the contents of the claim should be - * copied to a new claim and then the data to append should be written to that new claim. - */ - @Deprecated - long importFrom(InputStream content, ContentClaim claim, boolean append) throws IOException; - - /** * Exports the content of the given claim to the given destination. * * @return the size of the destination or the claim http://git-wip-us.apache.org/repos/asf/nifi/blob/15a8699d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index af99d50..d9c3f39 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3318,6 +3318,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R lineageIdentifiers.add(parentUUID); final String newFlowFileUUID = UUID.randomUUID().toString(); + + // We need to create a new FlowFile by populating it with information from the + // Provenance Event. Particularly of note here is that we are setting the FlowFile's + // contentClaimOffset to 0. This is done for backward compatibility reasons. ContentClaim + // used to not have a concept of an offset, and the offset was tied only to the FlowFile. This + // was later refactored, so that the offset was part of the ContentClaim. If we set the offset + // in both places, we'll end up skipping over that many bytes twice instead of once (once to get + // to the beginning of the Content Claim and again to get to the offset within that Content Claim). + // To avoid this, we just always set the offset in the Content Claim itself and set the + // FlowFileRecord's contentClaimOffset to 0. final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() // Copy relevant info from source FlowFile .addAttributes(event.getPreviousAttributes()) http://git-wip-us.apache.org/repos/asf/nifi/blob/15a8699d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 0a9acc4..18a3de1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -66,8 +66,8 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.stream.io.ByteCountingOutputStream; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.LongHolder; import org.apache.nifi.util.NiFiProperties; @@ -94,7 +94,18 @@ public class FileSystemRepository implements ContentRepository { private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true); private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> reclaimable = new ConcurrentHashMap<>(); private final Map<String, ContainerState> containerStateMap = new HashMap<>(); - private final long maxAppendClaimLength = 1024L * 1024L; // 1 MB + // 1 MB. This could be adjusted but 1 MB seems reasonable, as it means that we won't continually write to one + // file that keeps growing but gives us a chance to bunch together a lot of small files. Before, we had issues + // with creating and deleting too many files, as we had to delete 100's of thousands of files every 2 minutes + // in order to avoid backpressure on session commits. With 1 MB as the target file size, 100's of thousands of + // files would mean that we are writing gigabytes per second - quite a bit faster than any disks can handle now. + private final long maxAppendClaimLength = 1024L * 1024L; + + // Queue for claims that are kept open for writing. Size of 100 is pretty arbitrary. Ideally, this will be at + // least as large as the number of threads that will be updating the repository simultaneously but we don't want + // to get too large because it will hold open up to this many FileOutputStreams. + // The queue is used to determine which claim to write to and then the corresponding Map can be used to obtain + // the OutputStream that we can use for writing to the claim. private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100); private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100); @@ -235,6 +246,13 @@ public class FileSystemRepository implements ContentRepository { executor.shutdown(); containerCleanupExecutor.shutdown(); + // Close any of the writable claim streams that are currently open. + // Other threads may be writing to these streams, and that's okay. + // If that happens, we will simply close the stream, resulting in an + // IOException that will roll back the session. Since this is called + // only on shutdown of the application, we don't have to worry about + // partially written files - on restart, we will simply start writing + // to new files and leave those trailing bytes alone. for (final OutputStream out : writableClaimStreams.values()) { try { out.close(); @@ -482,7 +500,13 @@ public class FileSystemRepository implements ContentRepository { // the queue and incrementing the associated claimant count MUST be done atomically. // This way, if the claimant count is decremented to 0, we can ensure that the // claim is not then pulled from the queue and used as another thread is destroying/archiving - // the claim. + // the claim. The logic in the remove() method dictates that the underlying file can be + // deleted (or archived) only if the claimant count becomes <= 0 AND there is no other claim on + // the queue that references that file. As a result, we need to ensure that those two conditions + // can be evaluated atomically. In order for that to be the case, we need to also treat the + // removal of a claim from the queue and the incrementing of its claimant count as an atomic + // action to ensure that the comparison of those two conditions is atomic also. As a result, + // we will synchronize on the queue while performing those actions. final long resourceOffset; synchronized (writableClaimQueue) { final ClaimLengthPair pair = writableClaimQueue.poll(); @@ -571,7 +595,9 @@ public class FileSystemRepository implements ContentRepository { // we synchronize on the queue here because if the claimant count is 0, // we need to be able to remove any instance of that resource claim from the // queue atomically (i.e., the checking of the claimant count plus removal from the queue - // must be atomic) + // must be atomic). The create() method also synchronizes on the queue whenever it + // polls from the queue and increments a claimant count in order to ensure that these + // two conditions can be checked atomically. synchronized (writableClaimQueue) { final int claimantCount = resourceClaimManager.getClaimantCount(claim); if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) { @@ -647,24 +673,14 @@ public class FileSystemRepository implements ContentRepository { @Override public long importFrom(final Path content, final ContentClaim claim) throws IOException { - return importFrom(content, claim, false); - } - - @Override - public long importFrom(final Path content, final ContentClaim claim, final boolean append) throws IOException { try (final InputStream in = Files.newInputStream(content, StandardOpenOption.READ)) { - return importFrom(in, claim, append); + return importFrom(in, claim); } } @Override public long importFrom(final InputStream content, final ContentClaim claim) throws IOException { - return importFrom(content, claim, false); - } - - @Override - public long importFrom(final InputStream content, final ContentClaim claim, final boolean append) throws IOException { - try (final OutputStream out = write(claim, append)) { + try (final OutputStream out = write(claim, false)) { return StreamUtils.copy(content, out); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/15a8699d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java index 6c1626c..7c7cade 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java @@ -352,33 +352,21 @@ public class VolatileContentRepository implements ContentRepository { @Override public long importFrom(final Path content, final ContentClaim claim) throws IOException { - return importFrom(content, claim, false); - } - - @Override - public long importFrom(final Path content, final ContentClaim claim, boolean append) throws IOException { try (final InputStream in = new FileInputStream(content.toFile())) { - return importFrom(in, claim, append); + return importFrom(in, claim); } } @Override - public long importFrom(final InputStream content, final ContentClaim claim) throws IOException { - return importFrom(content, claim, false); - } - - @Override - public long importFrom(final InputStream in, final ContentClaim claim, final boolean append) throws IOException { + public long importFrom(final InputStream in, final ContentClaim claim) throws IOException { final ContentClaim backupClaim = getBackupClaim(claim); if (backupClaim == null) { final ContentBlock content = getContent(claim); - if (!append) { - content.reset(); - } + content.reset(); return StreamUtils.copy(in, content.write()); } else { - return getBackupRepository().importFrom(in, claim, append); + return getBackupRepository().importFrom(in, claim); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/15a8699d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java index a17bd40..b573006 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.controller; -import org.apache.nifi.controller.FileSystemSwapManager; import static org.junit.Assert.assertEquals; import java.io.BufferedInputStream; http://git-wip-us.apache.org/repos/asf/nifi/blob/15a8699d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index b1fd4c7..ba34148 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -1090,28 +1090,12 @@ public class TestStandardProcessSession { } @Override - public long importFrom(Path content, ContentClaim claim, boolean append) throws IOException { - if (append) { - throw new UnsupportedOperationException(); - } - return importFrom(content, claim); - } - - @Override public long importFrom(InputStream content, ContentClaim claim) throws IOException { Files.copy(content, getPath(claim)); return Files.size(getPath(claim)); } @Override - public long importFrom(InputStream content, ContentClaim claim, boolean append) throws IOException { - if (append) { - throw new UnsupportedOperationException(); - } - return importFrom(content, claim); - } - - @Override public long exportTo(ContentClaim claim, Path destination, boolean append) throws IOException { throw new UnsupportedOperationException(); }
