NIFI-744: Refactored ContentClaim into ContentClaim and ResourceClaim and 
allowed resource claim to be reused across sessions


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/68d94cc0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/68d94cc0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/68d94cc0

Branch: refs/heads/master
Commit: 68d94cc01b59187e71f32624c5251d332d39e8ab
Parents: 706edeb
Author: Mark Payne <[email protected]>
Authored: Tue Aug 18 19:55:33 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Fri Aug 21 11:08:34 2015 -0400

----------------------------------------------------------------------
 .../repository/ContentRepository.java           |  14 +-
 .../repository/FlowFileRepository.java          |   4 +-
 .../repository/FlowFileSwapManager.java         |   6 +-
 .../repository/claim/ContentClaim.java          |  31 +-
 .../repository/claim/ContentClaimManager.java   | 142 -----
 .../repository/claim/ResourceClaim.java         |  54 ++
 .../repository/claim/ResourceClaimManager.java  | 135 +++++
 .../stream/io/ByteCountingOutputStream.java     |   4 +
 .../SynchronizedByteCountingOutputStream.java   |  66 +++
 .../nifi-framework/nifi-framework-core/pom.xml  |   1 +
 .../nifi/controller/FileSystemSwapManager.java  |  63 ++-
 .../apache/nifi/controller/FlowController.java  | 168 +++---
 .../repository/FileSystemRepository.java        | 563 ++++++++++++++-----
 .../repository/StandardFlowFileRecord.java      |   6 +-
 .../repository/StandardProcessSession.java      | 389 ++++---------
 .../repository/VolatileContentRepository.java   |  49 +-
 .../repository/VolatileFlowFileRepository.java  |  46 +-
 .../WriteAheadFlowFileRepository.java           | 101 +++-
 .../repository/claim/StandardContentClaim.java  | 129 ++---
 .../claim/StandardContentClaimManager.java      | 145 -----
 .../repository/claim/StandardResourceClaim.java | 134 +++++
 .../claim/StandardResourceClaimManager.java     | 145 +++++
 .../controller/TestFileSystemSwapManager.java   |  24 +-
 .../repository/TestFileSystemRepository.java    | 116 +++-
 .../repository/TestStandardProcessSession.java  | 378 ++++---------
 .../TestVolatileContentRepository.java          |  17 +-
 .../TestWriteAheadFlowFileRepository.java       |   6 +-
 27 files changed, 1645 insertions(+), 1291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index ee3ead9..8d0bdb3 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -24,7 +24,7 @@ import java.util.Collection;
 import java.util.Set;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
 /**
  * Defines the capabilities of a content repository. Append options are not
@@ -42,7 +42,7 @@ public interface ContentRepository {
      * @param claimManager to handle claims
      * @throws java.io.IOException if unable to init
      */
-    void initialize(ContentClaimManager claimManager) throws IOException;
+    void initialize(ResourceClaimManager claimManager) throws IOException;
 
     /**
      * Shuts down the Content Repository, freeing any resources that may be
@@ -173,9 +173,13 @@ public interface ContentRepository {
      * @param content to import from
      * @param claim the claim to write imported content to
      * @param append if true, the content will be appended to the claim; if
-     * false, the content will replace the contents of the claim
+     *        false, the content will replace the contents of the claim
      * @throws IOException if unable to read content
+     *
+     * @deprecated if needing to append to a content claim, the contents of 
the claim should be
+     *             copied to a new claim and then the data to append should be 
written to that new claim.
      */
+    @Deprecated
     long importFrom(Path content, ContentClaim claim, boolean append) throws 
IOException;
 
     /**
@@ -198,7 +202,11 @@ public interface ContentRepository {
      * @param append whether to append or replace
      * @return length of data imported in bytes
      * @throws IOException if failure to read or write stream
+     *
+     * @deprecated if needing to append to a content claim, the contents of 
the claim should be
+     * copied to a new claim and then the data to append should be written to 
that new claim.
      */
+    @Deprecated
     long importFrom(InputStream content, ContentClaim claim, boolean append) 
throws IOException;
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index 5e59e04..58fc6b3 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -22,7 +22,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
 /**
  * Implementations must be thread safe
@@ -38,7 +38,7 @@ public interface FlowFileRepository extends Closeable {
      * @param claimManager for handling claims
      * @throws java.io.IOException if unable to initialize repository
      */
-    void initialize(ContentClaimManager claimManager) throws IOException;
+    void initialize(ResourceClaimManager claimManager) throws IOException;
 
     /**
      * @return the maximum number of bytes that can be stored in the underlying

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 869e2b3..2e5be11 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.events.EventReporter;
 
 /**
@@ -38,7 +38,7 @@ public interface FlowFileSwapManager {
      * @param reporter the EventReporter that can be used for notifying users 
of
      * important events
      */
-    void start(FlowFileRepository flowFileRepository, QueueProvider 
queueProvider, ContentClaimManager claimManager, EventReporter reporter);
+    void start(FlowFileRepository flowFileRepository, QueueProvider 
queueProvider, ResourceClaimManager claimManager, EventReporter reporter);
 
     /**
      * Shuts down the manager
@@ -59,5 +59,5 @@ public interface FlowFileSwapManager {
      * @param claimManager manager
      * @return how many flowfiles have been recovered
      */
-    long recoverSwappedFlowFiles(QueueProvider connectionProvider, 
ContentClaimManager claimManager);
+    long recoverSwappedFlowFiles(QueueProvider connectionProvider, 
ResourceClaimManager claimManager);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
index 53cc44f..5c1d76b 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
@@ -18,35 +18,30 @@ package org.apache.nifi.controller.repository.claim;
 
 /**
  * <p>
- * A ContentClaim is a reference to a given flow file's content. Multiple flow
- * files may reference the same content by both having the same content
- * claim.</p>
+ * A reference to a section of a {@link ResourceClaim}, which may or may not 
encompass
+ * the entire ResourceClaim. Multiple FlowFiles may reference the same content 
by both
+ * having the same content claim.
+ * </p>
  *
  * <p>
- * Must be thread safe</p>
- *
+ * Must be thread safe
+ * </p>
  */
 public interface ContentClaim extends Comparable<ContentClaim> {
 
     /**
-     * @return the unique identifier for this claim
-     */
-    String getId();
-
-    /**
-     * @return the container identifier in which this claim is held
+     * @return the ResourceClaim that this ContentClaim references
      */
-    String getContainer();
+    ResourceClaim getResourceClaim();
 
     /**
-     * @return the section within a given container the claim is held
+     * @return the offset into the ResourceClaim where the content for this
+     * claim begins
      */
-    String getSection();
+    long getOffset();
 
     /**
-     * @return Indicates whether or not the Claim is loss-tolerant. If so, we 
will
-     * attempt to keep the content but will not sacrifice a great deal of
-     * performance to do so
+     * @return the length of this ContentClaim
      */
-    boolean isLossTolerant();
+    long getLength();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
deleted file mode 100644
index bffcec3..0000000
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.claim;
-
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Responsible for managing all ContentClaims that are used in the application
- */
-public interface ContentClaimManager {
-
-    /**
-     * Creates a new Content Claim with the given id, container, section, and
-     * loss tolerance.
-     *
-     * @param id of claim
-     * @param container of claim
-     * @param section of claim
-     * @param lossTolerant of claim
-     * @return new claim
-     */
-    ContentClaim newContentClaim(String container, String section, String id, 
boolean lossTolerant);
-
-    /**
-     * @param claim to obtain reference count for
-     * @return the number of FlowFiles that hold a claim to a particular piece
-     * of FlowFile content
-     */
-    int getClaimantCount(ContentClaim claim);
-
-    /**
-     * Decreases by 1 the count of how many FlowFiles hold a claim to a
-     * particular piece of FlowFile content and returns the new count
-     *
-     * @param claim to decrement claimants on
-     * @return new claimaint count
-     */
-    int decrementClaimantCount(ContentClaim claim);
-
-    /**
-     * Increases by 1 the count of how many FlowFiles hold a claim to a
-     * particular piece of FlowFile content and returns the new count
-     *
-     * @param claim to increment claims on
-     * @return new claimant count
-     */
-    int incrementClaimantCount(ContentClaim claim);
-
-    /**
-     * Increases by 1 the count of how many FlowFiles hold a claim to a
-     * particular piece of FlowFile content and returns the new count.
-     *
-     * If it is known that the Content Claim whose count is being incremented 
is
-     * a newly created ContentClaim, this method should be called with a value
-     * of {@code true} as the second argument, as it may allow the manager to
-     * optimize its tasks, knowing that the Content Claim cannot be referenced
-     * by any other component
-     *
-     * @param claim to increment
-     * @param newClaim provides a hint that no other process can have access 
to this
-     * claim right now
-     * @return new claim count
-     */
-    int incrementClaimantCount(ContentClaim claim, boolean newClaim);
-
-    /**
-     * Indicates that the given ContentClaim can now be destroyed by the
-     * appropriate Content Repository. This should be done only after it is
-     * guaranteed that the FlowFile Repository has been synchronized with its
-     * underlying storage component. This way, we avoid the following sequence
-     * of events:
-     * <ul>
-     * <li>FlowFile Repository is updated to indicate that FlowFile F no longer
-     * depends on ContentClaim C</li>
-     * <li>ContentClaim C is no longer needed and is destroyed</li>
-     * <li>The Operating System crashes or there is a power failure</li>
-     * <li>Upon restart, the FlowFile Repository was not synchronized with its
-     * underlying storage mechanism and as such indicates that FlowFile F needs
-     * ContentClaim C.</li>
-     * <li>Since ContentClaim C has already been destroyed, it is inaccessible,
-     * and FlowFile F's Content is not found, so the FlowFile is removed,
-     * resulting in data loss.</li>
-     * </ul>
-     *
-     * <p>
-     * Using this method of marking the ContentClaim as destructable only when
-     * the FlowFile repository has been synced with the underlying storage
-     * mechanism, we can ensure that on restart, we will not point to this
-     * unneeded claim. As such, it is now safe to destroy the contents.
-     * </p>
-     *
-     * @param claim to mark as now destructable
-     */
-    void markDestructable(ContentClaim claim);
-
-    /**
-     * Drains up to {@code maxElements} Content Claims from the internal queue
-     * of destructable content claims to the given {@code destination} so that
-     * they can be destroyed.
-     *
-     * @param destination to drain to
-     * @param maxElements max items to drain
-     */
-    void drainDestructableClaims(Collection<ContentClaim> destination, int 
maxElements);
-
-    /**
-     * Drains up to {@code maxElements} Content Claims from the internal queue
-     * of destructable content claims to the given {@code destination} so that
-     * they can be destroyed. If no ContentClaim is ready to be destroyed at
-     * this time, will wait up to the specified amount of time before 
returning.
-     * If, after the specified amount of time, there is still no ContentClaim
-     * ready to be destroyed, the method will return without having added
-     * anything to the given {@code destination}.
-     *
-     * @param destination to drain to
-     * @param maxElements max items to drain
-     * @param timeout maximum time to wait
-     * @param unit unit of time to wait
-     */
-    void drainDestructableClaims(Collection<ContentClaim> destination, int 
maxElements, long timeout, TimeUnit unit);
-
-    /**
-     * Clears the manager's memory of any and all ContentClaims that it knows
-     * about
-     */
-    void purge();
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
new file mode 100644
index 0000000..d448632
--- /dev/null
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import org.apache.nifi.controller.repository.ContentRepository;
+
+/**
+ * <p>
+ * Represents a resource that can be provided by a {@link ContentRepository}
+ * </p>
+ *
+ * <p>
+ * MUST BE THREAD-SAFE!
+ * </p>
+ */
+public interface ResourceClaim extends Comparable<ResourceClaim> {
+
+    /**
+     * @return the unique identifier for this claim
+     */
+    String getId();
+
+    /**
+     * @return the container identifier in which this claim is held
+     */
+    String getContainer();
+
+    /**
+     * @return the section within a given container the claim is held
+     */
+    String getSection();
+
+    /**
+     * @return Indicates whether or not the Claim is loss-tolerant. If so, we 
will
+     *         attempt to keep the content but will not sacrifice a great deal 
of
+     *         performance to do so
+     */
+    boolean isLossTolerant();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
new file mode 100644
index 0000000..01f4c65
--- /dev/null
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Responsible for managing all ResourceClaims that are used in the application
+ */
+public interface ResourceClaimManager {
+
+    /**
+     * Creates a new Resource Claim with the given id, container, section, and
+     * loss tolerance.
+     *
+     * @param id of claim
+     * @param container of claim
+     * @param section of claim
+     * @param lossTolerant of claim
+     * @return new claim
+     */
+    ResourceClaim newResourceClaim(String container, String section, String 
id, boolean lossTolerant);
+
+    /**
+     * @param claim to obtain reference count for
+     * @return the number of FlowFiles that hold a claim to a particular piece
+     * of FlowFile content
+     */
+    int getClaimantCount(ResourceClaim claim);
+
+    /**
+     * Decreases by 1 the count of how many FlowFiles hold a claim to a
+     * particular piece of FlowFile content and returns the new count
+     *
+     * @param claim to decrement claimants on
+     * @return new claimaint count
+     */
+    int decrementClaimantCount(ResourceClaim claim);
+
+    /**
+     * Increases by 1 the count of how many FlowFiles hold a claim to a
+     * particular piece of FlowFile content and returns the new count
+     *
+     * @param claim to increment claims on
+     * @return new claimant count
+     */
+    int incrementClaimantCount(ResourceClaim claim);
+
+    /**
+     * Increases by 1 the count of how many FlowFiles hold a claim to a
+     * particular piece of FlowFile content and returns the new count.
+     *
+     * If it is known that the Content Claim whose count is being incremented 
is
+     * a newly created ResourceClaim, this method should be called with a value
+     * of {@code true} as the second argument, as it may allow the manager to
+     * optimize its tasks, knowing that the Content Claim cannot be referenced
+     * by any other component
+     *
+     * @param claim to increment
+     * @param newClaim provides a hint that no other process can have access 
to this
+     *            claim right now
+     * @return new claim count
+     */
+    int incrementClaimantCount(ResourceClaim claim, boolean newClaim);
+
+    /**
+     * Indicates that the given ResourceClaim can now be destroyed by the
+     * appropriate Content Repository. This should be done only after it is
+     * guaranteed that the FlowFile Repository has been synchronized with its
+     * underlying storage component. This way, we avoid the following sequence
+     * of events:
+     * <ul>
+     * <li>FlowFile Repository is updated to indicate that FlowFile F no 
longer depends on ResourceClaim C</li>
+     * <li>ResourceClaim C is no longer needed and is destroyed</li>
+     * <li>The Operating System crashes or there is a power failure</li>
+     * <li>Upon restart, the FlowFile Repository was not synchronized with its 
underlying storage mechanism and as such indicates that FlowFile F needs 
ResourceClaim C.</li>
+     * <li>Since ResourceClaim C has already been destroyed, it is 
inaccessible, and FlowFile F's Content is not found, so the FlowFile is 
removed, resulting in data loss.</li>
+     * </ul>
+     *
+     * <p>
+     * Using this method of marking the ResourceClaim as destructable only 
when the FlowFile repository has been synced with the underlying storage 
mechanism, we can ensure that on restart, we will
+     * not point to this unneeded claim. As such, it is now safe to destroy 
the contents.
+     * </p>
+     *
+     * @param claim to mark as now destructable
+     */
+    void markDestructable(ResourceClaim claim);
+
+    /**
+     * Drains up to {@code maxElements} Content Claims from the internal queue
+     * of destructable content claims to the given {@code destination} so that
+     * they can be destroyed.
+     *
+     * @param destination to drain to
+     * @param maxElements max items to drain
+     */
+    void drainDestructableClaims(Collection<ResourceClaim> destination, int 
maxElements);
+
+    /**
+     * Drains up to {@code maxElements} Content Claims from the internal queue
+     * of destructable content claims to the given {@code destination} so that
+     * they can be destroyed. If no ResourceClaim is ready to be destroyed at
+     * this time, will wait up to the specified amount of time before 
returning.
+     * If, after the specified amount of time, there is still no ResourceClaim
+     * ready to be destroyed, the method will return without having added
+     * anything to the given {@code destination}.
+     *
+     * @param destination to drain to
+     * @param maxElements max items to drain
+     * @param timeout maximum time to wait
+     * @param unit unit of time to wait
+     */
+    void drainDestructableClaims(Collection<ResourceClaim> destination, int 
maxElements, long timeout, TimeUnit unit);
+
+    /**
+     * Clears the manager's memory of any and all ResourceClaims that it knows
+     * about
+     */
+    void purge();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
index 9bbd45e..47f236d 100644
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
@@ -63,4 +63,8 @@ public class ByteCountingOutputStream extends OutputStream {
     public void close() throws IOException {
         out.close();
     }
+
+    public OutputStream getWrappedStream() {
+        return out;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/SynchronizedByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/SynchronizedByteCountingOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/SynchronizedByteCountingOutputStream.java
new file mode 100644
index 0000000..e617829
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/SynchronizedByteCountingOutputStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class SynchronizedByteCountingOutputStream extends 
ByteCountingOutputStream {
+
+    public SynchronizedByteCountingOutputStream(final OutputStream out) {
+        super(out);
+    }
+
+    public SynchronizedByteCountingOutputStream(final OutputStream out, final 
long byteCount) {
+        super(out, byteCount);
+    }
+
+    @Override
+    public synchronized void flush() throws IOException {
+        super.flush();
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        super.close();
+    }
+
+    @Override
+    public synchronized long getBytesWritten() {
+        return super.getBytesWritten();
+    }
+
+    @Override
+    public synchronized OutputStream getWrappedStream() {
+        return super.getWrappedStream();
+    }
+
+    @Override
+    public synchronized void write(final byte[] b) throws IOException {
+        super.write(b);
+    }
+
+    @Override
+    public synchronized void write(final int b) throws IOException {
+        super.write(b);
+    }
+
+    @Override
+    public synchronized void write(final byte[] b, final int off, final int 
len) throws IOException {
+        super.write(b, off, len);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 8d64143..f48988a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -135,6 +135,7 @@
                         <exclude>src/test/resources/conf/0bytes.xml</exclude>
                         
<exclude>src/test/resources/conf/termination-only.xml</exclude>
                         <exclude>src/test/resources/hello.txt</exclude>
+                        <exclude>src/test/resources/bye.txt</exclude>
                         
<exclude>src/test/resources/old-swap-file.swap</exclude>
                     </excludes>
                 </configuration>

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 604dba9..c829566 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -61,7 +61,9 @@ import 
org.apache.nifi.controller.repository.FlowFileSwapManager;
 import org.apache.nifi.controller.repository.QueueProvider;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.stream.io.BufferedOutputStream;
@@ -83,7 +85,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
     private static final Pattern SWAP_FILE_PATTERN = 
Pattern.compile("\\d+-.+\\.swap");
     private static final Pattern TEMP_SWAP_FILE_PATTERN = 
Pattern.compile("\\d+-.+\\.swap\\.part");
 
-    public static final int SWAP_ENCODING_VERSION = 6;
+    public static final int SWAP_ENCODING_VERSION = 7;
     public static final String EVENT_CATEGORY = "Swap FlowFiles";
 
     private final ScheduledExecutorService swapQueueIdentifierExecutor;
@@ -98,7 +100,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
     private final long swapOutMillis;
     private final int swapOutThreadCount;
 
-    private ContentClaimManager claimManager; // effectively final
+    private ResourceClaimManager claimManager; // effectively final
 
     private static final Logger logger = 
LoggerFactory.getLogger(FileSystemSwapManager.class);
 
@@ -138,7 +140,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
     }
 
     @Override
-    public synchronized void start(final FlowFileRepository 
flowFileRepository, final QueueProvider connectionProvider, final 
ContentClaimManager claimManager, final EventReporter eventReporter) {
+    public synchronized void start(final FlowFileRepository 
flowFileRepository, final QueueProvider connectionProvider, final 
ResourceClaimManager claimManager, final EventReporter eventReporter) {
         this.claimManager = claimManager;
         this.flowFileRepository = flowFileRepository;
         this.eventReporter = eventReporter;
@@ -184,11 +186,14 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
                     out.writeBoolean(false);
                 } else {
                     out.writeBoolean(true);
-                    out.writeUTF(claim.getId());
-                    out.writeUTF(claim.getContainer());
-                    out.writeUTF(claim.getSection());
+                    final ResourceClaim resourceClaim = 
claim.getResourceClaim();
+                    out.writeUTF(resourceClaim.getId());
+                    out.writeUTF(resourceClaim.getContainer());
+                    out.writeUTF(resourceClaim.getSection());
+                    out.writeLong(claim.getOffset());
+                    out.writeLong(claim.getLength());
                     out.writeLong(flowFile.getContentClaimOffset());
-                    out.writeBoolean(claim.isLossTolerant());
+                    out.writeBoolean(resourceClaim.isLossTolerant());
                 }
 
                 final Map<String, String> attributes = 
flowFile.getAttributes();
@@ -226,7 +231,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
         }
     }
 
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, 
final FlowFileQueue queue, final ContentClaimManager claimManager) throws 
IOException {
+    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, 
final FlowFileQueue queue, final ResourceClaimManager claimManager) throws 
IOException {
         final int swapEncodingVersion = in.readInt();
         if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
             throw new IOException("Cannot swap FlowFiles in from SwapFile 
because the encoding version is "
@@ -245,7 +250,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
     }
 
     static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, 
final int numFlowFiles, final FlowFileQueue queue,
-            final int serializationVersion, final boolean 
incrementContentClaims, final ContentClaimManager claimManager) throws 
IOException {
+            final int serializationVersion, final boolean 
incrementContentClaims, final ResourceClaimManager claimManager) throws 
IOException {
         final List<FlowFileRecord> flowFiles = new ArrayList<>();
         for (int i = 0; i < numFlowFiles; i++) {
             // legacy encoding had an "action" because it used to be couple 
with FlowFile Repository code
@@ -292,6 +297,17 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
 
                 final String container = in.readUTF();
                 final String section = in.readUTF();
+
+                final long resourceOffset;
+                final long resourceLength;
+                if (serializationVersion < 6) {
+                    resourceOffset = 0L;
+                    resourceLength = -1L;
+                } else {
+                    resourceOffset = in.readLong();
+                    resourceLength = in.readLong();
+                }
+
                 final long claimOffset = in.readLong();
 
                 final boolean lossTolerant;
@@ -301,10 +317,12 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
                     lossTolerant = false;
                 }
 
-                final ContentClaim claim = 
claimManager.newContentClaim(container, section, claimId, lossTolerant);
+                final ResourceClaim resourceClaim = 
claimManager.newResourceClaim(container, section, claimId, lossTolerant);
+                final StandardContentClaim claim = new 
StandardContentClaim(resourceClaim, resourceOffset);
+                claim.setLength(resourceLength);
 
                 if (incrementContentClaims) {
-                    claimManager.incrementClaimantCount(claim);
+                    claimManager.incrementClaimantCount(resourceClaim);
                 }
 
                 ffBuilder.contentClaim(claim);
@@ -353,16 +371,16 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
             throw new EOFException();
         }
         if (firstValue == 0xff && secondValue == 0xff) {
-            int ch1 = in.read();
-            int ch2 = in.read();
-            int ch3 = in.read();
-            int ch4 = in.read();
+            final int ch1 = in.read();
+            final int ch2 = in.read();
+            final int ch3 = in.read();
+            final int ch4 = in.read();
             if ((ch1 | ch2 | ch3 | ch4) < 0) {
                 throw new EOFException();
             }
-            return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
+            return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
         } else {
-            return ((firstValue << 8) + (secondValue));
+            return (firstValue << 8) + secondValue;
         }
     }
 
@@ -422,7 +440,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
                 final FlowFileQueue flowFileQueue = entry.getKey();
 
                 // if queue is more than 60% of its swap threshold, don't swap 
flowfiles in
-                if (flowFileQueue.unswappedSize() >= ((float) 
flowFileQueue.getSwapThreshold() * 0.6F)) {
+                if (flowFileQueue.unswappedSize() >= 
flowFileQueue.getSwapThreshold() * 0.6F) {
                     continue;
                 }
 
@@ -432,7 +450,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
                         final Queue<File> queue = queueLockWrapper.getQueue();
 
                         // Swap FlowFiles in until we hit 90% of the 
threshold, or until we're out of files.
-                        while (flowFileQueue.unswappedSize() < ((float) 
flowFileQueue.getSwapThreshold() * 0.9F)) {
+                        while (flowFileQueue.unswappedSize() < 
flowFileQueue.getSwapThreshold() * 0.9F) {
                             File swapFile = null;
                             try {
                                 swapFile = queue.poll();
@@ -545,7 +563,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
                         QueueLockWrapper swapQueue = 
swapMap.get(flowFileQueue);
                         if (swapQueue == null) {
                             swapQueue = new QueueLockWrapper(new 
LinkedBlockingQueue<File>());
-                            QueueLockWrapper oldQueue = 
swapMap.putIfAbsent(flowFileQueue, swapQueue);
+                            final QueueLockWrapper oldQueue = 
swapMap.putIfAbsent(flowFileQueue, swapQueue);
                             if (oldQueue != null) {
                                 swapQueue = oldQueue;
                             }
@@ -567,7 +585,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
      * @return the largest FlowFile ID that was recovered
      */
     @Override
-    public long recoverSwappedFlowFiles(final QueueProvider queueProvider, 
final ContentClaimManager claimManager) {
+    public long recoverSwappedFlowFiles(final QueueProvider queueProvider, 
final ResourceClaimManager claimManager) {
         final File[] swapFiles = storageDirectory.listFiles(new 
FilenameFilter() {
             @Override
             public boolean accept(final File dir, final String name) {
@@ -680,6 +698,7 @@ public class FileSystemSwapManager implements 
FlowFileSwapManager {
         }
     }
 
+    @Override
     public void shutdown() {
         swapQueueIdentifierExecutor.shutdownNow();
         swapInExecutor.shutdownNow();

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 3d78b3a..af99d50 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -98,9 +98,12 @@ import 
org.apache.nifi.controller.repository.StandardCounterRepository;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import org.apache.nifi.controller.repository.StandardRepositoryRecord;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
 import org.apache.nifi.controller.scheduling.ProcessContextFactory;
@@ -176,7 +179,6 @@ import 
org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
@@ -282,7 +284,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private final NodeProtocolSender protocolSender;
 
     private final ScheduledExecutorService clusterTaskExecutor = new 
FlowEngine(3, "Clustering Tasks");
-    private final ContentClaimManager contentClaimManager = new 
StandardContentClaimManager();
+    private final ResourceClaimManager contentClaimManager = new 
StandardResourceClaimManager();
 
     // guarded by rwLock
     /**
@@ -495,7 +497,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
     }
 
-    private static FlowFileRepository createFlowFileRepository(final 
NiFiProperties properties, final ContentClaimManager contentClaimManager) {
+    private static FlowFileRepository createFlowFileRepository(final 
NiFiProperties properties, final ResourceClaimManager contentClaimManager) {
         final String implementationClassName = 
properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, 
DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create FlowFile Repository 
because the NiFi Properties is missing the following property: "
@@ -3108,7 +3110,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             @Override
             public boolean isInputAvailable() {
                 try {
-                    return 
contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(),
 event.getPreviousContentClaimSection(), 
event.getPreviousContentClaimIdentifier()));
+                    return 
contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(),
 event.getPreviousContentClaimSection(),
+                        event.getPreviousContentClaimIdentifier(), 
event.getPreviousContentClaimOffset()));
                 } catch (final IOException e) {
                     return false;
                 }
@@ -3117,43 +3120,20 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             @Override
             public boolean isOutputAvailable() {
                 try {
-                    return 
contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), 
event.getContentClaimSection(), event.getContentClaimIdentifier()));
+                    return 
contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), 
event.getContentClaimSection(),
+                        event.getContentClaimIdentifier(), 
event.getContentClaimOffset()));
                 } catch (final IOException e) {
                     return false;
                 }
             }
 
-            private ContentClaim createClaim(final String container, final 
String section, final String identifier) {
+            private ContentClaim createClaim(final String container, final 
String section, final String identifier, final Long offset) {
                 if (container == null || section == null || identifier == 
null) {
                     return null;
                 }
 
-                return new ContentClaim() {
-                    @Override
-                    public int compareTo(final ContentClaim o) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return identifier;
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return container;
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return section;
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return false;
-                    }
-                };
+                final StandardResourceClaim resourceClaim = new 
StandardResourceClaim(container, section, identifier, false);
+                return new StandardContentClaim(resourceClaim, offset == null 
? 0L : offset.longValue());
             }
 
             @Override
@@ -3170,45 +3150,48 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         requireNonNull(requestUri);
 
         final ContentClaim claim;
-        final Long offset;
         final long size;
+        final long offset;
         if (direction == ContentDirection.INPUT) {
             if (provEvent.getPreviousContentClaimContainer() == null || 
provEvent.getPreviousContentClaimSection() == null || 
provEvent.getPreviousContentClaimIdentifier() == null) {
                 throw new IllegalArgumentException("Input Content Claim not 
specified");
             }
 
-            claim = 
contentClaimManager.newContentClaim(provEvent.getPreviousContentClaimContainer(),
 provEvent.getPreviousContentClaimSection(), 
provEvent.getPreviousContentClaimIdentifier(), false);
-            offset = provEvent.getPreviousContentClaimOffset();
+            final ResourceClaim resourceClaim = 
contentClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(),
 provEvent.getPreviousContentClaimSection(),
+                provEvent.getPreviousContentClaimIdentifier(), false);
+            claim = new StandardContentClaim(resourceClaim, 
provEvent.getPreviousContentClaimOffset());
+            offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : 
provEvent.getPreviousContentClaimOffset();
             size = provEvent.getPreviousFileSize();
         } else {
             if (provEvent.getContentClaimContainer() == null || 
provEvent.getContentClaimSection() == null || 
provEvent.getContentClaimIdentifier() == null) {
                 throw new IllegalArgumentException("Output Content Claim not 
specified");
             }
 
-            claim = 
contentClaimManager.newContentClaim(provEvent.getContentClaimContainer(), 
provEvent.getContentClaimSection(), provEvent.getContentClaimIdentifier(), 
false);
-            offset = provEvent.getContentClaimOffset();
+            final ResourceClaim resourceClaim = 
contentClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), 
provEvent.getContentClaimSection(),
+                provEvent.getContentClaimIdentifier(), false);
+
+            claim = new StandardContentClaim(resourceClaim, 
provEvent.getContentClaimOffset());
+            offset = provEvent.getContentClaimOffset() == null ? 0L : 
provEvent.getContentClaimOffset();
             size = provEvent.getFileSize();
         }
 
         final InputStream rawStream = contentRepository.read(claim);
-        if (offset != null) {
-            StreamUtils.skip(rawStream, offset.longValue());
-        }
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
 
         // Register a Provenance Event to indicate that we replayed the data.
         final ProvenanceEventRecord sendEvent = new 
StandardProvenanceEventRecord.Builder()
-                .setEventType(ProvenanceEventType.SEND)
-                .setFlowFileUUID(provEvent.getFlowFileUuid())
-                .setAttributes(provEvent.getAttributes(), Collections.<String, 
String>emptyMap())
-                .setCurrentContentClaim(claim.getContainer(), 
claim.getSection(), claim.getId(), offset, size)
-                .setTransitUri(requestUri)
-                .setEventTime(System.currentTimeMillis())
-                .setFlowFileEntryDate(provEvent.getFlowFileEntryDate())
-                .setLineageStartDate(provEvent.getLineageStartDate())
-                .setComponentType(getName())
-                .setComponentId(getRootGroupId())
-                .setDetails("Download of " + (direction == 
ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + 
requestor + " for Provenance Event " + provEvent.getEventId())
-                .build();
+            .setEventType(ProvenanceEventType.SEND)
+            .setFlowFileUUID(provEvent.getFlowFileUuid())
+            .setAttributes(provEvent.getAttributes(), Collections.<String, 
String> emptyMap())
+            .setCurrentContentClaim(resourceClaim.getContainer(), 
resourceClaim.getSection(), resourceClaim.getId(), offset, size)
+            .setTransitUri(requestUri)
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileEntryDate(provEvent.getFlowFileEntryDate())
+            .setLineageStartDate(provEvent.getLineageStartDate())
+            .setComponentType(getName())
+            .setComponentId(getRootGroupId())
+            .setDetails("Download of " + (direction == ContentDirection.INPUT 
? "Input" : "Output") + " Content requested by " + requestor + " for Provenance 
Event " + provEvent.getEventId())
+            .build();
 
         provenanceEventRepository.registerEvent(sendEvent);
 
@@ -3233,7 +3216,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
 
         try {
-            if 
(!contentRepository.isAccessible(contentClaimManager.newContentClaim(contentClaimContainer,
 contentClaimSection, contentClaimId, false))) {
+            final ResourceClaim resourceClaim = 
contentClaimManager.newResourceClaim(contentClaimContainer, 
contentClaimSection, contentClaimId, false);
+            final ContentClaim contentClaim = new 
StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
+
+            if (!contentRepository.isAccessible(contentClaim)) {
                 return "Content is no longer available in Content Repository";
             }
         } catch (final IOException ioe) {
@@ -3310,18 +3296,20 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
 
         // Create the ContentClaim
-        final ContentClaim claim = 
contentClaimManager.newContentClaim(event.getPreviousContentClaimContainer(),
+        final ResourceClaim resourceClaim = 
contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
                 event.getPreviousContentClaimSection(), 
event.getPreviousContentClaimIdentifier(), false);
 
         // Increment Claimant Count, since we will now be referencing the 
Content Claim
-        contentClaimManager.incrementClaimantCount(claim);
+        contentClaimManager.incrementClaimantCount(resourceClaim);
+        final long claimOffset = event.getPreviousContentClaimOffset() == null 
? 0L : event.getPreviousContentClaimOffset().longValue();
+        final StandardContentClaim contentClaim = new 
StandardContentClaim(resourceClaim, claimOffset);
+        contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : 
event.getPreviousFileSize());
 
-        if (!contentRepository.isAccessible(claim)) {
-            contentClaimManager.decrementClaimantCount(claim);
+        if (!contentRepository.isAccessible(contentClaim)) {
+            contentClaimManager.decrementClaimantCount(resourceClaim);
             throw new IllegalStateException("Cannot replay data from 
Provenance Event because the data is no longer available in the Content 
Repository");
         }
 
-        final long claimOffset = event.getPreviousContentClaimOffset() == null 
? 0L : event.getPreviousContentClaimOffset().longValue();
         final String parentUUID = event.getFlowFileUuid();
 
         // Create the FlowFile Record
@@ -3331,39 +3319,39 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         final String newFlowFileUUID = UUID.randomUUID().toString();
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                // Copy relevant info from source FlowFile
-                .addAttributes(event.getPreviousAttributes())
-                .contentClaim(claim)
-                .contentClaimOffset(claimOffset)
-                .entryDate(System.currentTimeMillis())
-                .id(flowFileRepository.getNextFlowFileSequence())
-                .lineageIdentifiers(lineageIdentifiers)
-                .lineageStartDate(event.getLineageStartDate())
-                .size(contentSize.longValue())
-                // Create a new UUID and add attributes indicating that this 
is a replay
-                .addAttribute("flowfile.replay", "true")
-                .addAttribute("flowfile.replay.timestamp", String.valueOf(new 
Date()))
-                .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID)
-                // remove attributes that may have existed on the source 
FlowFile that we don't want to exist on the new FlowFile
-                .removeAttributes(CoreAttributes.DISCARD_REASON.key(), 
CoreAttributes.ALTERNATE_IDENTIFIER.key())
-                // build the record
-                .build();
+            // Copy relevant info from source FlowFile
+            .addAttributes(event.getPreviousAttributes())
+            .contentClaim(contentClaim)
+            .contentClaimOffset(0L) // use 0 because we used the content claim 
offset in the Content Claim itself
+            .entryDate(System.currentTimeMillis())
+            .id(flowFileRepository.getNextFlowFileSequence())
+            .lineageIdentifiers(lineageIdentifiers)
+            .lineageStartDate(event.getLineageStartDate())
+            .size(contentSize.longValue())
+            // Create a new UUID and add attributes indicating that this is a 
replay
+            .addAttribute("flowfile.replay", "true")
+            .addAttribute("flowfile.replay.timestamp", String.valueOf(new 
Date()))
+            .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID)
+            // remove attributes that may have existed on the source FlowFile 
that we don't want to exist on the new FlowFile
+            .removeAttributes(CoreAttributes.DISCARD_REASON.key(), 
CoreAttributes.ALTERNATE_IDENTIFIER.key())
+            // build the record
+            .build();
 
         // Register a Provenance Event to indicate that we replayed the data.
         final ProvenanceEventRecord replayEvent = new 
StandardProvenanceEventRecord.Builder()
-                .setEventType(ProvenanceEventType.REPLAY)
-                .addChildUuid(newFlowFileUUID)
-                .addParentUuid(parentUUID)
-                .setFlowFileUUID(parentUUID)
-                .setAttributes(Collections.<String, String>emptyMap(), 
flowFileRecord.getAttributes())
-                .setCurrentContentClaim(event.getContentClaimSection(), 
event.getContentClaimContainer(), event.getContentClaimIdentifier(), 
event.getContentClaimOffset(), event.getFileSize())
-                .setDetails("Replay requested by " + requestor)
-                .setEventTime(System.currentTimeMillis())
-                .setFlowFileEntryDate(System.currentTimeMillis())
-                .setLineageStartDate(event.getLineageStartDate())
-                .setComponentType(event.getComponentType())
-                .setComponentId(event.getComponentId())
-                .build();
+            .setEventType(ProvenanceEventType.REPLAY)
+            .addChildUuid(newFlowFileUUID)
+            .addParentUuid(parentUUID)
+            .setFlowFileUUID(parentUUID)
+            .setAttributes(Collections.<String, String> emptyMap(), 
flowFileRecord.getAttributes())
+            .setCurrentContentClaim(event.getContentClaimSection(), 
event.getContentClaimContainer(), event.getContentClaimIdentifier(), 
event.getContentClaimOffset(), event.getFileSize())
+            .setDetails("Replay requested by " + requestor)
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileEntryDate(System.currentTimeMillis())
+            .setLineageStartDate(event.getLineageStartDate())
+            .setComponentType(event.getComponentType())
+            .setComponentId(event.getComponentId())
+            .build();
         provenanceEventRepository.registerEvent(replayEvent);
 
         // Update the FlowFile Repository to indicate that we have added the 
FlowFile to the flow

Reply via email to