Repository: nifi
Updated Branches:
  refs/heads/master d421e3c24 -> 4f6c1cfff


NIFI-744: Addressed feedback from review, mostly adding documentation to a few 
points in the code


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/15a8699d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/15a8699d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/15a8699d

Branch: refs/heads/master
Commit: 15a8699dc49d6d5ac4a450f3eaea52def7addb63
Parents: 68d94cc
Author: Mark Payne <[email protected]>
Authored: Fri Aug 21 10:52:40 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Fri Aug 21 11:08:34 2015 -0400

----------------------------------------------------------------------
 .../repository/ContentRepository.java           | 34 --------------
 .../apache/nifi/controller/FlowController.java  | 10 ++++
 .../repository/FileSystemRepository.java        | 48 +++++++++++++-------
 .../repository/VolatileContentRepository.java   | 20 ++------
 .../controller/TestFileSystemSwapManager.java   |  1 -
 .../repository/TestStandardProcessSession.java  | 16 -------
 6 files changed, 46 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/15a8699d/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index 8d0bdb3..b1ea87c 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -165,24 +165,6 @@ public interface ContentRepository {
     long importFrom(Path content, ContentClaim claim) throws IOException;
 
     /**
-     * Imports content from the given path to the specified claim, appending or
-     * replacing the current claim, according to the value of the append
-     * argument
-     *
-     * @return the size of the claim
-     * @param content to import from
-     * @param claim the claim to write imported content to
-     * @param append if true, the content will be appended to the claim; if
-     *        false, the content will replace the contents of the claim
-     * @throws IOException if unable to read content
-     *
-     * @deprecated if needing to append to a content claim, the contents of 
the claim should be
-     *             copied to a new claim and then the data to append should be 
written to that new claim.
-     */
-    @Deprecated
-    long importFrom(Path content, ContentClaim claim, boolean append) throws 
IOException;
-
-    /**
      * Imports content from the given stream creating a new content object and
      * claim within the repository.
      *
@@ -194,22 +176,6 @@ public interface ContentRepository {
     long importFrom(InputStream content, ContentClaim claim) throws 
IOException;
 
     /**
-     * Imports content from the given stream, appending or replacing the 
current
-     * claim, according to the value of the appen dargument
-     *
-     * @param content to import from
-     * @param claim to write to
-     * @param append whether to append or replace
-     * @return length of data imported in bytes
-     * @throws IOException if failure to read or write stream
-     *
-     * @deprecated if needing to append to a content claim, the contents of 
the claim should be
-     * copied to a new claim and then the data to append should be written to 
that new claim.
-     */
-    @Deprecated
-    long importFrom(InputStream content, ContentClaim claim, boolean append) 
throws IOException;
-
-    /**
      * Exports the content of the given claim to the given destination.
      *
      * @return the size of the destination or the claim

http://git-wip-us.apache.org/repos/asf/nifi/blob/15a8699d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index af99d50..d9c3f39 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -3318,6 +3318,16 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         lineageIdentifiers.add(parentUUID);
 
         final String newFlowFileUUID = UUID.randomUUID().toString();
+
+        // We need to create a new FlowFile by populating it with information 
from the
+        // Provenance Event. Particularly of note here is that we are setting 
the FlowFile's
+        // contentClaimOffset to 0. This is done for backward compatibility 
reasons. ContentClaim
+        // used to not have a concept of an offset, and the offset was tied 
only to the FlowFile. This
+        // was later refactored, so that the offset was part of the 
ContentClaim. If we set the offset
+        // in both places, we'll end up skipping over that many bytes twice 
instead of once (once to get
+        // to the beginning of the Content Claim and again to get to the 
offset within that Content Claim).
+        // To avoid this, we just always set the offset in the Content Claim 
itself and set the
+        // FlowFileRecord's contentClaimOffset to 0.
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
             // Copy relevant info from source FlowFile
             .addAttributes(event.getPreviousAttributes())

http://git-wip-us.apache.org/repos/asf/nifi/blob/15a8699d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 0a9acc4..18a3de1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -66,8 +66,8 @@ import 
org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.LongHolder;
 import org.apache.nifi.util.NiFiProperties;
@@ -94,7 +94,18 @@ public class FileSystemRepository implements 
ContentRepository {
     private final ScheduledExecutorService executor = new FlowEngine(4, 
"FileSystemRepository Workers", true);
     private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> 
reclaimable = new ConcurrentHashMap<>();
     private final Map<String, ContainerState> containerStateMap = new 
HashMap<>();
-    private final long maxAppendClaimLength = 1024L * 1024L; // 1 MB
+    // 1 MB. This could be adjusted but 1 MB seems reasonable, as it means 
that we won't continually write to one
+    // file that keeps growing but gives us a chance to bunch together a lot 
of small files. Before, we had issues
+    // with creating and deleting too many files, as we had to delete 100's of 
thousands of files every 2 minutes
+    // in order to avoid backpressure on session commits. With 1 MB as the 
target file size, 100's of thousands of
+    // files would mean that we are writing gigabytes per second - quite a bit 
faster than any disks can handle now.
+    private final long maxAppendClaimLength = 1024L * 1024L;
+
+    // Queue for claims that are kept open for writing. Size of 100 is pretty 
arbitrary. Ideally, this will be at
+    // least as large as the number of threads that will be updating the 
repository simultaneously but we don't want
+    // to get too large because it will hold open up to this many 
FileOutputStreams.
+    // The queue is used to determine which claim to write to and then the 
corresponding Map can be used to obtain
+    // the OutputStream that we can use for writing to the claim.
     private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new 
LinkedBlockingQueue<>(100);
     private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> 
writableClaimStreams = new ConcurrentHashMap<>(100);
 
@@ -235,6 +246,13 @@ public class FileSystemRepository implements 
ContentRepository {
         executor.shutdown();
         containerCleanupExecutor.shutdown();
 
+        // Close any of the writable claim streams that are currently open.
+        // Other threads may be writing to these streams, and that's okay.
+        // If that happens, we will simply close the stream, resulting in an
+        // IOException that will roll back the session. Since this is called
+        // only on shutdown of the application, we don't have to worry about
+        // partially written files - on restart, we will simply start writing
+        // to new files and leave those trailing bytes alone.
         for (final OutputStream out : writableClaimStreams.values()) {
             try {
                 out.close();
@@ -482,7 +500,13 @@ public class FileSystemRepository implements 
ContentRepository {
         // the queue and incrementing the associated claimant count MUST be 
done atomically.
         // This way, if the claimant count is decremented to 0, we can ensure 
that the
         // claim is not then pulled from the queue and used as another thread 
is destroying/archiving
-        // the claim.
+        // the claim. The logic in the remove() method dictates that the 
underlying file can be
+        // deleted (or archived) only if the claimant count becomes <= 0 AND 
there is no other claim on
+        // the queue that references that file. As a result, we need to ensure 
that those two conditions
+        // can be evaluated atomically. In order for that to be the case, we 
need to also treat the
+        // removal of a claim from the queue and the incrementing of its 
claimant count as an atomic
+        // action to ensure that the comparison of those two conditions is 
atomic also. As a result,
+        // we will synchronize on the queue while performing those actions.
         final long resourceOffset;
         synchronized (writableClaimQueue) {
             final ClaimLengthPair pair = writableClaimQueue.poll();
@@ -571,7 +595,9 @@ public class FileSystemRepository implements 
ContentRepository {
         // we synchronize on the queue here because if the claimant count is 0,
         // we need to be able to remove any instance of that resource claim 
from the
         // queue atomically (i.e., the checking of the claimant count plus 
removal from the queue
-        // must be atomic)
+        // must be atomic). The create() method also synchronizes on the queue 
whenever it
+        // polls from the queue and increments a claimant count in order to 
ensure that these
+        // two conditions can be checked atomically.
         synchronized (writableClaimQueue) {
             final int claimantCount = 
resourceClaimManager.getClaimantCount(claim);
             if (claimantCount > 0 || writableClaimQueue.contains(new 
ClaimLengthPair(claim, null))) {
@@ -647,24 +673,14 @@ public class FileSystemRepository implements 
ContentRepository {
 
     @Override
     public long importFrom(final Path content, final ContentClaim claim) 
throws IOException {
-        return importFrom(content, claim, false);
-    }
-
-    @Override
-    public long importFrom(final Path content, final ContentClaim claim, final 
boolean append) throws IOException {
         try (final InputStream in = Files.newInputStream(content, 
StandardOpenOption.READ)) {
-            return importFrom(in, claim, append);
+            return importFrom(in, claim);
         }
     }
 
     @Override
     public long importFrom(final InputStream content, final ContentClaim 
claim) throws IOException {
-        return importFrom(content, claim, false);
-    }
-
-    @Override
-    public long importFrom(final InputStream content, final ContentClaim 
claim, final boolean append) throws IOException {
-        try (final OutputStream out = write(claim, append)) {
+        try (final OutputStream out = write(claim, false)) {
             return StreamUtils.copy(content, out);
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/15a8699d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index 6c1626c..7c7cade 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -352,33 +352,21 @@ public class VolatileContentRepository implements 
ContentRepository {
 
     @Override
     public long importFrom(final Path content, final ContentClaim claim) 
throws IOException {
-        return importFrom(content, claim, false);
-    }
-
-    @Override
-    public long importFrom(final Path content, final ContentClaim claim, 
boolean append) throws IOException {
         try (final InputStream in = new FileInputStream(content.toFile())) {
-            return importFrom(in, claim, append);
+            return importFrom(in, claim);
         }
     }
 
     @Override
-    public long importFrom(final InputStream content, final ContentClaim 
claim) throws IOException {
-        return importFrom(content, claim, false);
-    }
-
-    @Override
-    public long importFrom(final InputStream in, final ContentClaim claim, 
final boolean append) throws IOException {
+    public long importFrom(final InputStream in, final ContentClaim claim) 
throws IOException {
         final ContentClaim backupClaim = getBackupClaim(claim);
         if (backupClaim == null) {
             final ContentBlock content = getContent(claim);
-            if (!append) {
-                content.reset();
-            }
+            content.reset();
 
             return StreamUtils.copy(in, content.write());
         } else {
-            return getBackupRepository().importFrom(in, claim, append);
+            return getBackupRepository().importFrom(in, claim);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/15a8699d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index a17bd40..b573006 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.controller;
 
-import org.apache.nifi.controller.FileSystemSwapManager;
 import static org.junit.Assert.assertEquals;
 
 import java.io.BufferedInputStream;

http://git-wip-us.apache.org/repos/asf/nifi/blob/15a8699d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index b1fd4c7..ba34148 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -1090,28 +1090,12 @@ public class TestStandardProcessSession {
         }
 
         @Override
-        public long importFrom(Path content, ContentClaim claim, boolean 
append) throws IOException {
-            if (append) {
-                throw new UnsupportedOperationException();
-            }
-            return importFrom(content, claim);
-        }
-
-        @Override
         public long importFrom(InputStream content, ContentClaim claim) throws 
IOException {
             Files.copy(content, getPath(claim));
             return Files.size(getPath(claim));
         }
 
         @Override
-        public long importFrom(InputStream content, ContentClaim claim, 
boolean append) throws IOException {
-            if (append) {
-                throw new UnsupportedOperationException();
-            }
-            return importFrom(content, claim);
-        }
-
-        @Override
         public long exportTo(ContentClaim claim, Path destination, boolean 
append) throws IOException {
             throw new UnsupportedOperationException();
         }

Reply via email to