http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index fe34fe0..a85b23b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -22,7 +22,9 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
 /**
  * <p>
@@ -32,10 +34,10 @@ import 
org.apache.nifi.controller.repository.claim.ContentClaimManager;
 public class VolatileFlowFileRepository implements FlowFileRepository {
 
     private final AtomicLong idGenerator = new AtomicLong(0L);
-    private ContentClaimManager claimManager; // effectively final
+    private ResourceClaimManager claimManager; // effectively final
 
     @Override
-    public void initialize(final ContentClaimManager claimManager) {
+    public void initialize(final ResourceClaimManager claimManager) {
         this.claimManager = claimManager;
     }
 
@@ -58,23 +60,49 @@ public class VolatileFlowFileRepository implements 
FlowFileRepository {
     public void close() throws IOException {
     }
 
+    private void markDestructable(final ContentClaim contentClaim) {
+        if (contentClaim == null) {
+            return;
+        }
+
+        final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+        if (resourceClaim == null) {
+            return;
+        }
+
+        claimManager.markDestructable(resourceClaim);
+    }
+
+    private int getClaimantCount(final ContentClaim claim) {
+        if (claim == null) {
+            return 0;
+        }
+
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
+        if (resourceClaim == null) {
+            return 0;
+        }
+
+        return claimManager.getClaimantCount(resourceClaim);
+    }
+
     @Override
     public void updateRepository(final Collection<RepositoryRecord> records) 
throws IOException {
         for (final RepositoryRecord record : records) {
             if (record.getType() == RepositoryRecordType.DELETE) {
                 // For any DELETE record that we have, if current claim's 
claimant count <= 0, mark it as destructable
-                if (record.getCurrentClaim() != null && 
claimManager.getClaimantCount(record.getCurrentClaim()) <= 0) {
-                    claimManager.markDestructable(record.getCurrentClaim());
+                if (record.getCurrentClaim() != null && 
getClaimantCount(record.getCurrentClaim()) <= 0) {
+                    markDestructable(record.getCurrentClaim());
                 }
 
                 // If the original claim is different than the current claim 
and the original claim has a claimant count <= 0, mark it as destructable.
-                if (record.getOriginalClaim() != null && 
!record.getOriginalClaim().equals(record.getCurrentClaim()) && 
claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
-                    claimManager.markDestructable(record.getOriginalClaim());
+                if (record.getOriginalClaim() != null && 
!record.getOriginalClaim().equals(record.getCurrentClaim()) && 
getClaimantCount(record.getOriginalClaim()) <= 0) {
+                    markDestructable(record.getOriginalClaim());
                 }
             } else if (record.getType() == RepositoryRecordType.UPDATE) {
                 // if we have an update, and the original is no longer needed, 
mark original as destructable
-                if (record.getOriginalClaim() != null && 
record.getCurrentClaim() != record.getOriginalClaim() && 
claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
-                    claimManager.markDestructable(record.getOriginalClaim());
+                if (record.getOriginalClaim() != null && 
record.getCurrentClaim() != record.getOriginalClaim() && 
getClaimantCount(record.getOriginalClaim()) <= 0) {
+                    markDestructable(record.getOriginalClaim());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index f2df821..5ee5fb5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -43,11 +43,12 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.wali.MinimalLockingWriteAheadLog;
@@ -86,7 +87,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     // effectively final
     private WriteAheadRepository<RepositoryRecord> wal;
     private WriteAheadRecordSerde serde;
-    private ContentClaimManager claimManager;
+    private ResourceClaimManager claimManager;
 
     // WALI Provides the ability to register callbacks for when a Partition or 
the entire Repository is sync'ed with the underlying disk.
     // We keep track of this because we need to ensure that the ContentClaims 
are destroyed only after the FlowFile Repository has been
@@ -125,7 +126,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     }
 
     @Override
-    public void initialize(final ContentClaimManager claimManager) throws 
IOException {
+    public void initialize(final ResourceClaimManager claimManager) throws 
IOException {
         this.claimManager = claimManager;
 
         Files.createDirectories(flowFileRepositoryPath);
@@ -168,6 +169,32 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         updateRepository(records, alwaysSync);
     }
 
+    private void markDestructable(final ContentClaim contentClaim) {
+        if (contentClaim == null) {
+            return;
+        }
+
+        final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+        if (resourceClaim == null) {
+            return;
+        }
+
+        claimManager.markDestructable(resourceClaim);
+    }
+
+    private int getClaimantCount(final ContentClaim claim) {
+        if (claim == null) {
+            return 0;
+        }
+
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
+        if (resourceClaim == null) {
+            return 0;
+        }
+
+        return claimManager.getClaimantCount(resourceClaim);
+    }
+
     private void updateRepository(final Collection<RepositoryRecord> records, 
final boolean sync) throws IOException {
         for (final RepositoryRecord record : records) {
             if (record.getType() != RepositoryRecordType.DELETE && 
record.getType() != RepositoryRecordType.CONTENTMISSING && 
record.getDestination() == null) {
@@ -190,17 +217,17 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         for (final RepositoryRecord record : records) {
             if (record.getType() == RepositoryRecordType.DELETE) {
                 // For any DELETE record that we have, if current claim's 
claimant count <= 0, mark it as destructable
-                if (record.getCurrentClaim() != null && 
claimManager.getClaimantCount(record.getCurrentClaim()) <= 0) {
+                if (record.getCurrentClaim() != null && 
getClaimantCount(record.getCurrentClaim()) <= 0) {
                     claimsToAdd.add(record.getCurrentClaim());
                 }
 
                 // If the original claim is different than the current claim 
and the original claim has a claimant count <= 0, mark it as destructable.
-                if (record.getOriginalClaim() != null && 
!record.getOriginalClaim().equals(record.getCurrentClaim()) && 
claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
+                if (record.getOriginalClaim() != null && 
!record.getOriginalClaim().equals(record.getCurrentClaim()) && 
getClaimantCount(record.getOriginalClaim()) <= 0) {
                     claimsToAdd.add(record.getOriginalClaim());
                 }
             } else if (record.getType() == RepositoryRecordType.UPDATE) {
                 // if we have an update, and the original is no longer needed, 
mark original as destructable
-                if (record.getOriginalClaim() != null && 
record.getCurrentClaim() != record.getOriginalClaim() && 
claimManager.getClaimantCount(record.getOriginalClaim()) <= 0) {
+                if (record.getOriginalClaim() != null && 
record.getCurrentClaim() != record.getOriginalClaim() && 
getClaimantCount(record.getOriginalClaim()) <= 0) {
                     claimsToAdd.add(record.getOriginalClaim());
                 }
             }
@@ -212,7 +239,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
             BlockingQueue<ContentClaim> claimQueue = 
claimsAwaitingDestruction.get(partitionKey);
             if (claimQueue == null) {
                 claimQueue = new LinkedBlockingQueue<>();
-                BlockingQueue<ContentClaim> existingClaimQueue = 
claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
+                final BlockingQueue<ContentClaim> existingClaimQueue = 
claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
                 if (existingClaimQueue != null) {
                     claimQueue = existingClaimQueue;
                 }
@@ -222,9 +249,6 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         }
     }
 
-    private void markDestructable(final ContentClaim claim) {
-        claimManager.markDestructable(claim);
-    }
 
     @Override
     public void onSync(final int partitionIndex) {
@@ -307,7 +331,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         for (final RepositoryRecord record : recordList) {
             final ContentClaim claim = record.getCurrentClaim();
             if (claim != null) {
-                claimManager.incrementClaimantCount(claim);
+                claimManager.incrementClaimantCount(claim.getResourceClaim());
             }
         }
 
@@ -339,7 +363,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
                     final long start = System.nanoTime();
                     final int numRecordsCheckpointed = checkpoint();
                     final long end = System.nanoTime();
-                    final long millis = TimeUnit.MILLISECONDS.convert((end - 
start), TimeUnit.NANOSECONDS);
+                    final long millis = TimeUnit.MILLISECONDS.convert(end - 
start, TimeUnit.NANOSECONDS);
                     logger.info("Successfully checkpointed FlowFile Repository 
with {} records in {} milliseconds",
                             new Object[]{numRecordsCheckpointed, millis});
                 } catch (final IOException e) {
@@ -378,9 +402,9 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
         private Map<String, FlowFileQueue> flowFileQueueMap = null;
         private long recordsRestored = 0L;
-        private final ContentClaimManager claimManager;
+        private final ResourceClaimManager claimManager;
 
-        public WriteAheadRecordSerde(final ContentClaimManager claimManager) {
+        public WriteAheadRecordSerde(final ResourceClaimManager claimManager) {
             this.claimManager = claimManager;
         }
 
@@ -518,7 +542,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
             }
 
             final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder();
-            RepositoryRecord record = currentRecordStates.get(recordId);
+            final RepositoryRecord record = currentRecordStates.get(recordId);
             ffBuilder.id(recordId);
             if (record != null) {
                 ffBuilder.fromFlowFile(record.getCurrent());
@@ -705,11 +729,16 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
                 out.write(0);
             } else {
                 out.write(1);
-                writeString(claim.getId(), out);
-                writeString(claim.getContainer(), out);
-                writeString(claim.getSection(), out);
+
+                final ResourceClaim resourceClaim = claim.getResourceClaim();
+                writeString(resourceClaim.getId(), out);
+                writeString(resourceClaim.getContainer(), out);
+                writeString(resourceClaim.getSection(), out);
+                out.writeLong(claim.getOffset());
+                out.writeLong(claim.getLength());
+
                 out.writeLong(offset);
-                out.writeBoolean(claim.isLossTolerant());
+                out.writeBoolean(resourceClaim.isLossTolerant());
             }
         }
 
@@ -726,6 +755,17 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
                 final String container = readString(in);
                 final String section = readString(in);
+
+                final long resourceOffset;
+                final long resourceLength;
+                if (serializationVersion < 7) {
+                    resourceOffset = 0L;
+                    resourceLength = -1L;
+                } else {
+                    resourceOffset = in.readLong();
+                    resourceLength = in.readLong();
+                }
+
                 final long claimOffset = in.readLong();
 
                 final boolean lossTolerant;
@@ -735,8 +775,11 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
                     lossTolerant = false;
                 }
 
-                final ContentClaim existingClaim = 
claimManager.newContentClaim(container, section, claimId, lossTolerant);
-                ffBuilder.contentClaim(existingClaim);
+                final ResourceClaim resourceClaim = 
claimManager.newResourceClaim(container, section, claimId, lossTolerant);
+                final StandardContentClaim contentClaim = new 
StandardContentClaim(resourceClaim, resourceOffset);
+                contentClaim.setLength(resourceLength);
+
+                ffBuilder.contentClaim(contentClaim);
                 ffBuilder.contentClaimOffset(claimOffset);
             } else if (claimExists == -1) {
                 throw new EOFException();
@@ -785,16 +828,16 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
                 throw new EOFException();
             }
             if (firstValue == 0xff && secondValue == 0xff) {
-                int ch1 = in.read();
-                int ch2 = in.read();
-                int ch3 = in.read();
-                int ch4 = in.read();
+                final int ch1 = in.read();
+                final int ch2 = in.read();
+                final int ch3 = in.read();
+                final int ch4 = in.read();
                 if ((ch1 | ch2 | ch3 | ch4) < 0) {
                     throw new EOFException();
                 }
-                return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
+                return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
             } else {
-                return ((firstValue << 8) + (secondValue));
+                return (firstValue << 8) + secondValue;
             }
         }
 
@@ -834,7 +877,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
         @Override
         public int getVersion() {
-            return 6;
+            return 7;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
index a8a6963..ea047c7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.controller.repository.claim;
 
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * <p>
@@ -28,115 +27,79 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public final class StandardContentClaim implements ContentClaim, 
Comparable<ContentClaim> {
 
-    private final String id;
-    private final String container;
-    private final String section;
-    private final boolean lossTolerant;
-    private final AtomicInteger claimantCount = new AtomicInteger(0);
-    private final int hashCode;
+    private final ResourceClaim resourceClaim;
+    private final long offset;
+    private volatile long length;
 
-    StandardContentClaim(final String container, final String section, final 
String id, final boolean lossTolerant) {
-        this.container = container.intern();
-        this.section = section.intern();
-        this.id = id;
-        this.lossTolerant = lossTolerant;
-
-        hashCode = (int) (17 + 19 * (id.hashCode()) + 19 * 
container.hashCode() + 19 * section.hashCode());
-    }
-
-    @Override
-    public boolean isLossTolerant() {
-        return lossTolerant;
+    public StandardContentClaim(final ResourceClaim resourceClaim, final long 
offset) {
+        this.resourceClaim = resourceClaim;
+        this.offset = offset;
+        this.length = -1L;
     }
 
-    /**
-     * @return the unique identifier for this claim
-     */
-    @Override
-    public String getId() {
-        return id;
+    public void setLength(final long length) {
+        this.length = length;
     }
 
-    /**
-     * @return the container identifier in which this claim is held
-     */
     @Override
-    public String getContainer() {
-        return container;
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result;
+        result = prime * result + (int) (length ^ length >>> 32);
+        result = prime * result + (int) (offset ^ offset >>> 32);
+        result = prime * result + (resourceClaim == null ? 0 : 
resourceClaim.hashCode());
+        return result;
     }
 
-    /**
-     * @return the section within a given container the claim is held
-     */
     @Override
-    public String getSection() {
-        return section;
-    }
-
-    int getClaimantCount() {
-        return claimantCount.get();
-    }
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
 
-    int decrementClaimantCount() {
-        return claimantCount.decrementAndGet();
-    }
+        if (obj == null) {
+            return false;
+        }
 
-    int incrementClaimantCount() {
-        return claimantCount.incrementAndGet();
-    }
+        if (!(obj instanceof ContentClaim)) {
+            return false;
+        }
 
-    /**
-     * Provides the natural ordering for ContentClaim objects. By default they 
are sorted by their id, then container, then section
-     *
-     * @param other other claim
-     * @return x such that x <=1 if this is less than other;
-     * x=0 if this.equals(other);
-     * x >= 1 if this is greater than other
-     */
-    @Override
-    public int compareTo(final ContentClaim other) {
-        final int idComparison = id.compareTo(other.getId());
-        if (idComparison != 0) {
-            return idComparison;
+        final ContentClaim other = (ContentClaim) obj;
+        if (length != other.getLength()) {
+            return false;
         }
 
-        final int containerComparison = 
container.compareTo(other.getContainer());
-        if (containerComparison != 0) {
-            return containerComparison;
+        if (offset != other.getOffset()) {
+            return false;
         }
 
-        return section.compareTo(other.getSection());
+        return resourceClaim.equals(other.getResourceClaim());
     }
 
     @Override
-    public boolean equals(final Object other) {
-        if (this == other) {
-            return true;
+    public int compareTo(final ContentClaim o) {
+        final int resourceComp = resourceClaim.compareTo(o.getResourceClaim());
+        if (resourceComp != 0) {
+            return resourceComp;
         }
 
-        if (other == null) {
-            return false;
-        }
-        if (hashCode != other.hashCode()) {
-            // We check hash code before instanceof because instanceof is 
fairly expensive and for
-            // StandardContentClaim, calling hashCode() simply returns a 
pre-calculated value.
-            return false;
-        }
+        return Long.compare(offset, o.getOffset());
+    }
 
-        if (!(other instanceof ContentClaim)) {
-            return false;
-        }
-        final ContentClaim otherClaim = (ContentClaim) other;
-        return id.equals(otherClaim.getId()) && 
container.equals(otherClaim.getContainer()) && 
section.equals(otherClaim.getSection());
+    @Override
+    public ResourceClaim getResourceClaim() {
+        return resourceClaim;
     }
 
     @Override
-    public int hashCode() {
-        return hashCode;
+    public long getOffset() {
+        return offset;
     }
 
     @Override
-    public String toString() {
-        return "ContentClaim[id=" + id + ", container=" + container + ", 
section=" + section + "]";
+    public long getLength() {
+        return length;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
deleted file mode 100644
index b68f95e..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.claim;
-
-import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StandardContentClaimManager implements ContentClaimManager {
-
-    private static final ConcurrentMap<ContentClaim, AtomicInteger> 
claimantCounts = new ConcurrentHashMap<>();
-    private static final Logger logger = 
LoggerFactory.getLogger(StandardContentClaimManager.class);
-
-    private static final BlockingQueue<ContentClaim> destructableClaims = new 
LinkedBlockingQueue<>(50000);
-
-    @Override
-    public ContentClaim newContentClaim(final String container, final String 
section, final String id, final boolean lossTolerant) {
-        return new StandardContentClaim(container, section, id, lossTolerant);
-    }
-
-    private static AtomicInteger getCounter(final ContentClaim claim) {
-        if (claim == null) {
-            return null;
-        }
-
-        AtomicInteger counter = claimantCounts.get(claim);
-        if (counter != null) {
-            return counter;
-        }
-
-        counter = new AtomicInteger(0);
-        AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, 
counter);
-        return (existingCounter == null) ? counter : existingCounter;
-    }
-
-    @Override
-    public int getClaimantCount(final ContentClaim claim) {
-        if (claim == null) {
-            return 0;
-        }
-        final AtomicInteger counter = claimantCounts.get(claim);
-        return (counter == null) ? 0 : counter.get();
-    }
-
-    @Override
-    public int decrementClaimantCount(final ContentClaim claim) {
-        if (claim == null) {
-            return 0;
-        }
-
-        final AtomicInteger counter = claimantCounts.get(claim);
-        if (counter == null) {
-            logger.debug("Decrementing claimant count for {} but claimant 
count is not known. Returning -1", claim);
-            return -1;
-        }
-
-        final int newClaimantCount = counter.decrementAndGet();
-        logger.debug("Decrementing claimant count for {} to {}", claim, 
newClaimantCount);
-        if (newClaimantCount == 0) {
-            claimantCounts.remove(claim);
-        }
-        return newClaimantCount;
-    }
-
-    @Override
-    public int incrementClaimantCount(final ContentClaim claim) {
-        return incrementClaimantCount(claim, false);
-    }
-
-    @Override
-    public int incrementClaimantCount(final ContentClaim claim, final boolean 
newClaim) {
-        final AtomicInteger counter = getCounter(claim);
-
-        final int newClaimantCount = counter.incrementAndGet();
-        logger.debug("Incrementing claimant count for {} to {}", claim, 
newClaimantCount);
-        // If the claimant count moved from 0 to 1, remove it from the queue 
of destructable claims.
-        if (!newClaim && newClaimantCount == 1) {
-            destructableClaims.remove(claim);
-        }
-        return newClaimantCount;
-    }
-
-    @Override
-    public void markDestructable(final ContentClaim claim) {
-        if (claim == null) {
-            return;
-        }
-
-        if (getClaimantCount(claim) > 0) {
-            return;
-        }
-
-        logger.debug("Marking claim {} as destructable", claim);
-        try {
-            while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) {
-            }
-        } catch (final InterruptedException ie) {
-        }
-    }
-
-    @Override
-    public void drainDestructableClaims(final Collection<ContentClaim> 
destination, final int maxElements) {
-        final int drainedCount = destructableClaims.drainTo(destination, 
maxElements);
-        logger.debug("Drained {} destructable claims to {}", drainedCount, 
destination);
-    }
-
-    @Override
-    public void drainDestructableClaims(final Collection<ContentClaim> 
destination, final int maxElements, final long timeout, final TimeUnit unit) {
-        try {
-            final ContentClaim firstClaim = destructableClaims.poll(timeout, 
unit);
-            if (firstClaim != null) {
-                destination.add(firstClaim);
-                destructableClaims.drainTo(destination, maxElements - 1);
-            }
-        } catch (final InterruptedException e) {
-        }
-    }
-
-    @Override
-    public void purge() {
-        claimantCounts.clear();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
new file mode 100644
index 0000000..bd3ed5a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StandardResourceClaim implements ResourceClaim, 
Comparable<ResourceClaim> {
+    private final String id;
+    private final String container;
+    private final String section;
+    private final boolean lossTolerant;
+    private final AtomicInteger claimantCount = new AtomicInteger(0);
+    private final int hashCode;
+
+    public StandardResourceClaim(final String container, final String section, 
final String id, final boolean lossTolerant) {
+        this.container = container.intern();
+        this.section = section.intern();
+        this.id = id;
+        this.lossTolerant = lossTolerant;
+
+        hashCode = 17 + 19 * id.hashCode() + 19 * container.hashCode() + 19 * 
section.hashCode();
+    }
+
+    @Override
+    public boolean isLossTolerant() {
+        return lossTolerant;
+    }
+
+    /**
+     * @return the unique identifier for this claim
+     */
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * @return the container identifier in which this claim is held
+     */
+    @Override
+    public String getContainer() {
+        return container;
+    }
+
+    /**
+     * @return the section within a given container the claim is held
+     */
+    @Override
+    public String getSection() {
+        return section;
+    }
+
+    int getClaimantCount() {
+        return claimantCount.get();
+    }
+
+    int decrementClaimantCount() {
+        return claimantCount.decrementAndGet();
+    }
+
+    int incrementClaimantCount() {
+        return claimantCount.incrementAndGet();
+    }
+
+    /**
+     * Provides the natural ordering for ResourceClaim objects. By default 
they are sorted by their id, then container, then section
+     *
+     * @param other other claim
+     * @return x such that x <=1 if this is less than other;
+     *         x=0 if this.equals(other);
+     *         x >= 1 if this is greater than other
+     */
+    @Override
+    public int compareTo(final ResourceClaim other) {
+        final int idComparison = id.compareTo(other.getId());
+        if (idComparison != 0) {
+            return idComparison;
+        }
+
+        final int containerComparison = 
container.compareTo(other.getContainer());
+        if (containerComparison != 0) {
+            return containerComparison;
+        }
+
+        return section.compareTo(other.getSection());
+    }
+
+    @Override
+    public boolean equals(final Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null) {
+            return false;
+        }
+        if (hashCode != other.hashCode()) {
+            // We check hash code before instanceof because instanceof is 
fairly expensive and for
+            // StandardResourceClaim, calling hashCode() simply returns a 
pre-calculated value.
+            return false;
+        }
+
+        if (!(other instanceof ResourceClaim)) {
+            return false;
+        }
+        final ResourceClaim otherClaim = (ResourceClaim) other;
+        return id.equals(otherClaim.getId()) && 
container.equals(otherClaim.getContainer()) && 
section.equals(otherClaim.getSection());
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public String toString() {
+        return "StandardResourceClaim[id=" + id + ", container=" + container + 
", section=" + section + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
new file mode 100644
index 0000000..4826ac3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.claim;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StandardResourceClaimManager implements ResourceClaimManager {
+
+    private static final ConcurrentMap<ResourceClaim, AtomicInteger> 
claimantCounts = new ConcurrentHashMap<>();
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardResourceClaimManager.class);
+
+    private static final BlockingQueue<ResourceClaim> destructableClaims = new 
LinkedBlockingQueue<>(50000);
+
+    @Override
+    public ResourceClaim newResourceClaim(final String container, final String 
section, final String id, final boolean lossTolerant) {
+        return new StandardResourceClaim(container, section, id, lossTolerant);
+    }
+
+    private static AtomicInteger getCounter(final ResourceClaim claim) {
+        if (claim == null) {
+            return null;
+        }
+
+        AtomicInteger counter = claimantCounts.get(claim);
+        if (counter != null) {
+            return counter;
+        }
+
+        counter = new AtomicInteger(0);
+        final AtomicInteger existingCounter = 
claimantCounts.putIfAbsent(claim, counter);
+        return existingCounter == null ? counter : existingCounter;
+    }
+
+    @Override
+    public int getClaimantCount(final ResourceClaim claim) {
+        if (claim == null) {
+            return 0;
+        }
+        final AtomicInteger counter = claimantCounts.get(claim);
+        return counter == null ? 0 : counter.get();
+    }
+
+    @Override
+    public int decrementClaimantCount(final ResourceClaim claim) {
+        if (claim == null) {
+            return 0;
+        }
+
+        final AtomicInteger counter = claimantCounts.get(claim);
+        if (counter == null) {
+            logger.debug("Decrementing claimant count for {} but claimant 
count is not known. Returning -1", claim);
+            return -1;
+        }
+
+        final int newClaimantCount = counter.decrementAndGet();
+        logger.debug("Decrementing claimant count for {} to {}", claim, 
newClaimantCount);
+        if (newClaimantCount == 0) {
+            claimantCounts.remove(claim);
+        }
+        return newClaimantCount;
+    }
+
+    @Override
+    public int incrementClaimantCount(final ResourceClaim claim) {
+        return incrementClaimantCount(claim, false);
+    }
+
+    @Override
+    public int incrementClaimantCount(final ResourceClaim claim, final boolean 
newClaim) {
+        final AtomicInteger counter = getCounter(claim);
+
+        final int newClaimantCount = counter.incrementAndGet();
+        logger.debug("Incrementing claimant count for {} to {}", claim, 
newClaimantCount);
+        // If the claimant count moved from 0 to 1, remove it from the queue 
of destructable claims.
+        if (!newClaim && newClaimantCount == 1) {
+            destructableClaims.remove(claim);
+        }
+        return newClaimantCount;
+    }
+
+    @Override
+    public void markDestructable(final ResourceClaim claim) {
+        if (claim == null) {
+            return;
+        }
+
+        if (getClaimantCount(claim) > 0) {
+            return;
+        }
+
+        logger.debug("Marking claim {} as destructable", claim);
+        try {
+            while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) {
+            }
+        } catch (final InterruptedException ie) {
+        }
+    }
+
+    @Override
+    public void drainDestructableClaims(final Collection<ResourceClaim> 
destination, final int maxElements) {
+        final int drainedCount = destructableClaims.drainTo(destination, 
maxElements);
+        logger.debug("Drained {} destructable claims to {}", drainedCount, 
destination);
+    }
+
+    @Override
+    public void drainDestructableClaims(final Collection<ResourceClaim> 
destination, final int maxElements, final long timeout, final TimeUnit unit) {
+        try {
+            final ResourceClaim firstClaim = destructableClaims.poll(timeout, 
unit);
+            if (firstClaim != null) {
+                destination.add(firstClaim);
+                destructableClaims.drainTo(destination, maxElements - 1);
+            }
+        } catch (final InterruptedException e) {
+        }
+    }
+
+    @Override
+    public void purge() {
+        claimantCounts.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 364dcad..a17bd40 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -30,8 +30,8 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -48,7 +48,7 @@ public class TestFileSystemSwapManager {
             final FlowFileQueue flowFileQueue = 
Mockito.mock(FlowFileQueue.class);
             
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
 
-            final List<FlowFileRecord> records = 
FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, new 
NopContentClaimManager());
+            final List<FlowFileRecord> records = 
FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, new 
NopResourceClaimManager());
             assertEquals(10000, records.size());
 
             for (final FlowFileRecord record : records) {
@@ -58,43 +58,43 @@ public class TestFileSystemSwapManager {
         }
     }
 
-    public class NopContentClaimManager implements ContentClaimManager {
+    public class NopResourceClaimManager implements ResourceClaimManager {
 
         @Override
-        public ContentClaim newContentClaim(String container, String section, 
String id, boolean lossTolerant) {
+        public ResourceClaim newResourceClaim(String container, String 
section, String id, boolean lossTolerant) {
             return null;
         }
 
         @Override
-        public int getClaimantCount(ContentClaim claim) {
+        public int getClaimantCount(ResourceClaim claim) {
             return 0;
         }
 
         @Override
-        public int decrementClaimantCount(ContentClaim claim) {
+        public int decrementClaimantCount(ResourceClaim claim) {
             return 0;
         }
 
         @Override
-        public int incrementClaimantCount(ContentClaim claim) {
+        public int incrementClaimantCount(ResourceClaim claim) {
             return 0;
         }
 
         @Override
-        public int incrementClaimantCount(ContentClaim claim, boolean 
newClaim) {
+        public int incrementClaimantCount(ResourceClaim claim, boolean 
newClaim) {
             return 0;
         }
 
         @Override
-        public void markDestructable(ContentClaim claim) {
+        public void markDestructable(ResourceClaim claim) {
         }
 
         @Override
-        public void drainDestructableClaims(Collection<ContentClaim> 
destination, int maxElements) {
+        public void drainDestructableClaims(Collection<ResourceClaim> 
destination, int maxElements) {
         }
 
         @Override
-        public void drainDestructableClaims(Collection<ContentClaim> 
destination, int maxElements, long timeout, TimeUnit unit) {
+        public void drainDestructableClaims(Collection<ResourceClaim> 
destination, int maxElements, long timeout, TimeUnit unit) {
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index ada0775..5ffcb3d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -16,11 +16,10 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.nifi.controller.repository.FileSystemRepository;
-import org.apache.nifi.controller.repository.ContentNotFoundException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
@@ -39,10 +38,11 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.util.DiskUtils;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.NiFiProperties;
-
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -53,20 +53,25 @@ public class TestFileSystemRepository {
     public static final File helloWorldFile = new 
File("src/test/resources/hello.txt");
 
     private FileSystemRepository repository = null;
+    private final File rootFile = new File("target/content_repository");
 
     @Before
     public void setup() throws IOException {
         System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, 
"src/test/resources/nifi.properties");
-        final File repo = new File("target/content_repository");
-        if (repo.exists()) {
-            DiskUtils.deleteRecursively(repo);
+        if (rootFile.exists()) {
+            DiskUtils.deleteRecursively(rootFile);
         }
 
         repository = new FileSystemRepository();
-        repository.initialize(new StandardContentClaimManager());
+        repository.initialize(new StandardResourceClaimManager());
         repository.purge();
     }
 
+    @After
+    public void shutdown() throws IOException {
+        repository.shutdown();
+    }
+
     @Test
     public void testCreateContentClaim() throws IOException {
         // value passed to #create is irrelevant because the 
FileSystemRepository does not currently support loss tolerance.
@@ -98,6 +103,79 @@ public class TestFileSystemRepository {
     }
 
     @Test
+    public void testResourceClaimReused() throws IOException {
+        final ContentClaim claim1 = repository.create(false);
+        final ContentClaim claim2 = repository.create(false);
+
+        // should not be equal because claim1 may still be in use
+        assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
+
+        try (final OutputStream out = repository.write(claim1)) {
+        }
+
+        final ContentClaim claim3 = repository.create(false);
+        assertEquals(claim1.getResourceClaim(), claim3.getResourceClaim());
+    }
+
+    @Test
+    public void testResourceClaimNotReusedAfterRestart() throws IOException, 
InterruptedException {
+        final ContentClaim claim1 = repository.create(false);
+        try (final OutputStream out = repository.write(claim1)) {
+        }
+
+        repository.shutdown();
+        Thread.sleep(1000L);
+
+        repository = new FileSystemRepository();
+        repository.initialize(new StandardResourceClaimManager());
+        repository.purge();
+
+        final ContentClaim claim2 = repository.create(false);
+        assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim());
+    }
+
+
+    @Test
+    public void testWriteWithNoContent() throws IOException {
+        final ContentClaim claim1 = repository.create(false);
+        try (final OutputStream out = repository.write(claim1)) {
+            out.write("Hello".getBytes());
+        }
+
+        final ContentClaim claim2 = repository.create(false);
+        assertEquals(claim1.getResourceClaim(), claim2.getResourceClaim());
+        try (final OutputStream out = repository.write(claim2)) {
+
+        }
+
+        final ContentClaim claim3 = repository.create(false);
+        assertEquals(claim1.getResourceClaim(), claim3.getResourceClaim());
+        try (final OutputStream out = repository.write(claim3)) {
+            out.write(" World".getBytes());
+        }
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final InputStream in = repository.read(claim1)) {
+            StreamUtils.copy(in, baos);
+        }
+
+        assertEquals("Hello", baos.toString());
+
+        baos.reset();
+        try (final InputStream in = repository.read(claim2)) {
+            StreamUtils.copy(in, baos);
+        }
+        assertEquals("", baos.toString());
+        assertEquals(0, baos.size());
+
+        baos.reset();
+        try (final InputStream in = repository.read(claim3)) {
+            StreamUtils.copy(in, baos);
+        }
+        assertEquals(" World", baos.toString());
+    }
+
+    @Test
     public void testRemoveDeletesFileIfNoClaimants() throws IOException {
         final ContentClaim claim = repository.create(true);
         assertNotNull(claim);
@@ -155,8 +233,16 @@ public class TestFileSystemRepository {
         final byte[] data = Files.readAllBytes(path);
         final byte[] expected = Files.readAllBytes(testFile.toPath());
         assertTrue(Arrays.equals(expected, data));
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final InputStream in = repository.read(claim)) {
+            StreamUtils.copy(in, baos);
+        }
+
+        assertTrue(Arrays.equals(expected, baos.toByteArray()));
     }
 
+
     @Test
     public void testImportFromStream() throws IOException {
         final ContentClaim claim = repository.create(false);
@@ -314,9 +400,9 @@ public class TestFileSystemRepository {
         }
 
         final ContentClaim destination = repository.create(true);
-        final byte[] headerBytes = (header == null) ? null : header.getBytes();
-        final byte[] footerBytes = (footer == null) ? null : footer.getBytes();
-        final byte[] demarcatorBytes = (demarcator == null) ? null : 
demarcator.getBytes();
+        final byte[] headerBytes = header == null ? null : header.getBytes();
+        final byte[] footerBytes = footer == null ? null : footer.getBytes();
+        final byte[] demarcatorBytes = demarcator == null ? null : 
demarcator.getBytes();
         repository.merge(claims, destination, headerBytes, footerBytes, 
demarcatorBytes);
 
         final StringBuilder sb = new StringBuilder();
@@ -334,8 +420,12 @@ public class TestFileSystemRepository {
         }
         final String expectedText = sb.toString();
         final byte[] expected = expectedText.getBytes();
-        final Path path = getPath(destination);
-        final byte[] actual = Files.readAllBytes(path);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) 
destination.getLength());
+        try (final InputStream in = repository.read(destination)) {
+            StreamUtils.copy(in, baos);
+        }
+        final byte[] actual = baos.toByteArray();
         assertTrue(Arrays.equals(expected, actual));
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 3486875..b1fd4c7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -51,8 +51,11 @@ import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.StandardFlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.Relationship;
@@ -179,7 +182,7 @@ public class TestStandardProcessSession {
         when(connectable.getConnections()).thenReturn(new HashSet<>(connList));
 
         contentRepo = new MockContentRepository();
-        contentRepo.initialize(new StandardContentClaimManager());
+        contentRepo.initialize(new StandardResourceClaimManager());
         flowFileRepo = new MockFlowFileRepository();
 
         final ProcessContext context = new ProcessContext(connectable, new 
AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, 
provenanceRepo);
@@ -194,10 +197,10 @@ public class TestStandardProcessSession {
         assertEquals(1, contentRepo.getExistingClaims().size());
 
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .contentClaim(claim)
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
         flowFileQueue.put(flowFileRecord);
 
         FlowFile flowFile = session.get();
@@ -295,10 +298,10 @@ public class TestStandardProcessSession {
     public void testAppendAfterSessionClosesStream() throws IOException {
         final ContentClaim claim = contentRepo.create(false);
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .contentClaim(claim)
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
         flowFileQueue.put(flowFileRecord);
         FlowFile flowFile = session.get();
         assertNotNull(flowFile);
@@ -316,12 +319,12 @@ public class TestStandardProcessSession {
     public void testReadAfterSessionClosesStream() throws IOException {
         final ContentClaim claim = contentRepo.create(false);
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .contentClaim(claim)
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
         flowFileQueue.put(flowFileRecord);
-        FlowFile flowFile = session.get();
+        final FlowFile flowFile = session.get();
         assertNotNull(flowFile);
         final ObjectHolder<InputStream> inputStreamHolder = new 
ObjectHolder<>(null);
         session.read(flowFile, new InputStreamCallback() {
@@ -337,10 +340,10 @@ public class TestStandardProcessSession {
     public void testStreamAfterSessionClosesStream() throws IOException {
         final ContentClaim claim = contentRepo.create(false);
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .contentClaim(claim)
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
         flowFileQueue.put(flowFileRecord);
         FlowFile flowFile = session.get();
         assertNotNull(flowFile);
@@ -361,10 +364,10 @@ public class TestStandardProcessSession {
     public void testWriteAfterSessionClosesStream() throws IOException {
         final ContentClaim claim = contentRepo.create(false);
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .contentClaim(claim)
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
         flowFileQueue.put(flowFileRecord);
         FlowFile flowFile = session.get();
         assertNotNull(flowFile);
@@ -382,9 +385,9 @@ public class TestStandardProcessSession {
     public void testCreateThenRollbackRemovesContent() throws IOException {
 
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
         flowFileQueue.put(flowFileRecord);
 
         final StreamCallback nop = new StreamCallback() {
@@ -402,7 +405,7 @@ public class TestStandardProcessSession {
 
         session.write(flowFile2, nop);
 
-        FlowFile flowFile3 = session.create();
+        final FlowFile flowFile3 = session.create();
         session.write(flowFile3, nop);
 
         session.rollback();
@@ -412,14 +415,14 @@ public class TestStandardProcessSession {
     @Test
     public void testForksNotEmittedIfFilesDeleted() throws IOException {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
 
         flowFileQueue.put(flowFileRecord);
 
-        FlowFile orig = session.get();
-        FlowFile newFlowFile = session.create(orig);
+        final FlowFile orig = session.get();
+        final FlowFile newFlowFile = session.create(orig);
         session.remove(newFlowFile);
         session.commit();
 
@@ -429,14 +432,14 @@ public class TestStandardProcessSession {
     @Test
     public void testProvenanceEventsEmittedForForkIfNotRemoved() throws 
IOException {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
 
         flowFileQueue.put(flowFileRecord);
 
-        FlowFile orig = session.get();
-        FlowFile newFlowFile = session.create(orig);
+        final FlowFile orig = session.get();
+        final FlowFile newFlowFile = session.create(orig);
         session.transfer(newFlowFile, new 
Relationship.Builder().name("A").build());
         session.commit();
 
@@ -446,15 +449,15 @@ public class TestStandardProcessSession {
     @Test
     public void testProvenanceEventsEmittedForRemove() throws IOException {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
 
         flowFileQueue.put(flowFileRecord);
 
-        FlowFile orig = session.get();
-        FlowFile newFlowFile = session.create(orig);
-        FlowFile secondNewFlowFile = session.create(orig);
+        final FlowFile orig = session.get();
+        final FlowFile newFlowFile = session.create(orig);
+        final FlowFile secondNewFlowFile = session.create(orig);
         session.remove(newFlowFile);
         session.transfer(secondNewFlowFile, new 
Relationship.Builder().name("A").build());
         session.commit();
@@ -465,16 +468,16 @@ public class TestStandardProcessSession {
     @Test
     public void testUpdateAttributesThenJoin() throws IOException {
         final FlowFileRecord flowFileRecord1 = new 
StandardFlowFileRecord.Builder()
-                .id(1L)
-                .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .id(1L)
+            .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
+            .entryDate(System.currentTimeMillis())
+            .build();
 
         final FlowFileRecord flowFileRecord2 = new 
StandardFlowFileRecord.Builder()
-                .id(2L)
-                .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .id(2L)
+            .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
+            .entryDate(System.currentTimeMillis())
+            .build();
 
         flowFileQueue.put(flowFileRecord1);
         flowFileQueue.put(flowFileRecord2);
@@ -538,17 +541,17 @@ public class TestStandardProcessSession {
     @Test
     public void testForkOneToOneReported() throws IOException {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
 
         flowFileQueue.put(flowFileRecord);
 
         // we have to increment the ID generator because we are creating a 
FlowFile without the FlowFile Repository's knowledge
         flowFileRepo.idGenerator.getAndIncrement();
 
-        FlowFile orig = session.get();
-        FlowFile newFlowFile = session.create(orig);
+        final FlowFile orig = session.get();
+        final FlowFile newFlowFile = session.create(orig);
         session.transfer(newFlowFile, new 
Relationship.Builder().name("A").build());
         session.getProvenanceReporter().fork(newFlowFile, 
Collections.singleton(orig));
         session.remove(orig);
@@ -566,7 +569,7 @@ public class TestStandardProcessSession {
 
     @Test
     public void 
testProcessExceptionThrownIfCallbackThrowsInOutputStreamCallback() {
-        FlowFile ff1 = session.create();
+        final FlowFile ff1 = session.create();
 
         final RuntimeException runtime = new RuntimeException();
         try {
@@ -610,7 +613,7 @@ public class TestStandardProcessSession {
 
     @Test
     public void testProcessExceptionThrownIfCallbackThrowsInStreamCallback() {
-        FlowFile ff1 = session.create();
+        final FlowFile ff1 = session.create();
 
         final RuntimeException runtime = new RuntimeException();
         try {
@@ -655,41 +658,16 @@ public class TestStandardProcessSession {
     @Test
     public void testMissingFlowFileExceptionThrownWhenUnableToReadData() {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "x";
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return "x";
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                })
-                .size(1L)
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+            .size(1L)
+            .build();
         flowFileQueue.put(flowFileRecord);
 
         // attempt to read the data.
         try {
-            FlowFile ff1 = session.get();
+            final FlowFile ff1 = session.get();
 
             session.read(ff1, new InputStreamCallback() {
                 @Override
@@ -704,41 +682,16 @@ public class TestStandardProcessSession {
     @Test
     public void 
testMissingFlowFileExceptionThrownWhenUnableToReadDataStreamCallback() {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "x";
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return "x";
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                })
-                .size(1L)
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+            .size(1L)
+            .build();
         flowFileQueue.put(flowFileRecord);
 
         // attempt to read the data.
         try {
-            FlowFile ff1 = session.get();
+            final FlowFile ff1 = session.get();
 
             session.write(ff1, new StreamCallback() {
                 @Override
@@ -753,35 +706,10 @@ public class TestStandardProcessSession {
     @Test
     public void 
testContentNotFoundExceptionThrownWhenUnableToReadDataStreamCallbackOffsetTooLarge()
 {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "container";
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return "section";
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                })
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+            .build();
         flowFileQueue.put(flowFileRecord);
 
         FlowFile ff1 = session.get();
@@ -794,43 +722,18 @@ public class TestStandardProcessSession {
         session.commit();
 
         final FlowFileRecord flowFileRecord2 = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "container";
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return "section";
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                })
-                .contentClaimOffset(1000L)
-                .size(1000L)
-                .build();
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+            .contentClaimOffset(1000L)
+            .size(1000L)
+            .build();
         flowFileQueue.put(flowFileRecord2);
 
         // attempt to read the data.
         try {
             session.get();
-            FlowFile ff2 = session.get();
+            final FlowFile ff2 = session.get();
             session.write(ff2, new StreamCallback() {
                 @Override
                 public void process(InputStream in, OutputStream out) throws 
IOException {
@@ -844,34 +747,11 @@ public class TestStandardProcessSession {
     @Test
     public void 
testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "container";
-                    }
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+            .build();
 
-                    @Override
-                    public String getSection() {
-                        return "section";
-                    }
-
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                }).build();
         flowFileQueue.put(flowFileRecord);
 
         FlowFile ff1 = session.get();
@@ -884,41 +764,17 @@ public class TestStandardProcessSession {
         session.commit();
 
         final FlowFileRecord flowFileRecord2 = new 
StandardFlowFileRecord.Builder()
-                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
-                .entryDate(System.currentTimeMillis())
-                .contentClaim(new ContentClaim() {
-                    @Override
-                    public int compareTo(ContentClaim arg0) {
-                        return 0;
-                    }
-
-                    @Override
-                    public String getId() {
-                        return "0";
-                    }
-
-                    @Override
-                    public String getContainer() {
-                        return "container";
-                    }
-
-                    @Override
-                    public String getSection() {
-                        return "section";
-                    }
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
 
-                    @Override
-                    public boolean isLossTolerant() {
-                        return true;
-                    }
-                })
-                .contentClaimOffset(1000L).size(1L).build();
+            .contentClaimOffset(1000L).size(1L).build();
         flowFileQueue.put(flowFileRecord2);
 
         // attempt to read the data.
         try {
             session.get();
-            FlowFile ff2 = session.get();
+            final FlowFile ff2 = session.get();
             session.read(ff2, new InputStreamCallback() {
                 @Override
                 public void process(InputStream in) throws IOException {
@@ -931,7 +787,7 @@ public class TestStandardProcessSession {
 
     @Test
     public void 
testProcessExceptionThrownIfCallbackThrowsInInputStreamCallback() {
-        FlowFile ff1 = session.create();
+        final FlowFile ff1 = session.create();
 
         final RuntimeException runtime = new RuntimeException();
         try {
@@ -975,7 +831,7 @@ public class TestStandardProcessSession {
 
     @Test
     public void testCreateEmitted() throws IOException {
-        FlowFile newFlowFile = session.create();
+        final FlowFile newFlowFile = session.create();
         session.transfer(newFlowFile, new 
Relationship.Builder().name("A").build());
         session.commit();
 
@@ -1009,9 +865,9 @@ public class TestStandardProcessSession {
     @Test
     public void testContentModifiedEmittedAndNotAttributesModified() throws 
IOException {
         final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
-                .id(1L)
-                .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
-                .build();
+            .id(1L)
+            .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+            .build();
         this.flowFileQueue.put(flowFile);
 
         FlowFile existingFlowFile = session.get();
@@ -1035,9 +891,9 @@ public class TestStandardProcessSession {
     @Test
     public void testAttributesModifiedEmitted() throws IOException {
         final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
-                .id(1L)
-                .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
-                .build();
+            .id(1L)
+            .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+            .build();
         this.flowFileQueue.put(flowFile);
 
         FlowFile existingFlowFile = session.get();
@@ -1104,7 +960,7 @@ public class TestStandardProcessSession {
         }
 
         @Override
-        public void initialize(ContentClaimManager claimManager) throws 
IOException {
+        public void initialize(ResourceClaimManager claimManager) throws 
IOException {
         }
     }
 
@@ -1112,9 +968,9 @@ public class TestStandardProcessSession {
 
         private final AtomicLong idGenerator = new AtomicLong(0L);
         private final AtomicLong claimsRemoved = new AtomicLong(0L);
-        private ContentClaimManager claimManager;
+        private ResourceClaimManager claimManager;
 
-        private ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = 
new ConcurrentHashMap<>();
+        private final ConcurrentMap<ContentClaim, AtomicInteger> 
claimantCounts = new ConcurrentHashMap<>();
 
         @Override
         public void shutdown() {
@@ -1124,9 +980,10 @@ public class TestStandardProcessSession {
             final Set<ContentClaim> claims = new HashSet<>();
 
             for (long i = 0; i < idGenerator.get(); i++) {
-                final ContentClaim claim = 
claimManager.newContentClaim("container", "section", String.valueOf(i), false);
-                if (getClaimantCount(claim) > 0) {
-                    claims.add(claim);
+                final ResourceClaim resourceClaim = new 
StandardResourceClaim("container", "section", String.valueOf(i), false);
+                final ContentClaim contentClaim = new 
StandardContentClaim(resourceClaim, 0L);
+                if (getClaimantCount(contentClaim) > 0) {
+                    claims.add(contentClaim);
                 }
             }
 
@@ -1135,15 +992,17 @@ public class TestStandardProcessSession {
 
         @Override
         public ContentClaim create(boolean lossTolerant) throws IOException {
-            final ContentClaim claim = 
claimManager.newContentClaim("container", "section", 
String.valueOf(idGenerator.getAndIncrement()), false);
-            claimantCounts.put(claim, new AtomicInteger(1));
-            final Path path = getPath(claim);
+            final ResourceClaim resourceClaim = 
claimManager.newResourceClaim("container", "section", 
String.valueOf(idGenerator.getAndIncrement()), false);
+            final ContentClaim contentClaim = new 
StandardContentClaim(resourceClaim, 0L);
+
+            claimantCounts.put(contentClaim, new AtomicInteger(1));
+            final Path path = getPath(contentClaim);
             final Path parent = path.getParent();
             if (Files.exists(parent) == false) {
                 Files.createDirectories(parent);
             }
-            Files.createFile(getPath(claim));
-            return claim;
+            Files.createFile(getPath(contentClaim));
+            return contentClaim;
         }
 
         @Override
@@ -1219,7 +1078,8 @@ public class TestStandardProcessSession {
             return 0;
         }
 
-        private Path getPath(final ContentClaim claim) {
+        private Path getPath(final ContentClaim contentClaim) {
+            final ResourceClaim claim = contentClaim.getResourceClaim();
             return 
Paths.get("target").resolve("contentRepo").resolve(claim.getContainer()).resolve(claim.getSection()).resolve(claim.getId());
         }
 
@@ -1315,7 +1175,7 @@ public class TestStandardProcessSession {
         }
 
         @Override
-        public void initialize(ContentClaimManager claimManager) throws 
IOException {
+        public void initialize(ResourceClaimManager claimManager) throws 
IOException {
             this.claimManager = claimManager;
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
index a32f321..5733164 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.repository;
 
 import org.apache.nifi.controller.repository.VolatileContentRepository;
+
 import static org.junit.Assert.assertEquals;
 
 import java.io.ByteArrayOutputStream;
@@ -29,10 +30,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.util.NiFiProperties;
-
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,11 +43,11 @@ import org.mockito.Mockito;
 
 public class TestVolatileContentRepository {
 
-    private ContentClaimManager claimManager;
+    private ResourceClaimManager claimManager;
 
     @Before
     public void setup() {
-        claimManager = new StandardContentClaimManager();
+        claimManager = new StandardResourceClaimManager();
     }
 
     @Test
@@ -81,8 +83,9 @@ public class TestVolatileContentRepository {
 
         final ContentRepository mockRepo = 
Mockito.mock(ContentRepository.class);
         contentRepo.setBackupRepository(mockRepo);
-        final ContentClaim newClaim = 
claimManager.newContentClaim("container", "section", "1000", true);
-        
Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(newClaim);
+        final ResourceClaim resourceClaim = 
claimManager.newResourceClaim("container", "section", "1000", true);
+        final ContentClaim contentClaim = new 
StandardContentClaim(resourceClaim, 0L);
+        
Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(contentClaim);
 
         final ByteArrayOutputStream overflowStream = new 
ByteArrayOutputStream();
         
Mockito.when(mockRepo.write(Matchers.any(ContentClaim.class))).thenReturn(overflowStream);

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 054ef5e..2138928 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -34,7 +34,7 @@ import java.util.List;
 
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.util.file.FileUtils;
 
 import org.junit.Test;
@@ -53,7 +53,7 @@ public class TestWriteAheadFlowFileRepository {
         }
 
         final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository();
-        repo.initialize(new StandardContentClaimManager());
+        repo.initialize(new StandardResourceClaimManager());
 
         final List<Connection> connectionList = new ArrayList<>();
         final QueueProvider queueProvider = new QueueProvider() {
@@ -119,7 +119,7 @@ public class TestWriteAheadFlowFileRepository {
 
         // restore
         final WriteAheadFlowFileRepository repo2 = new 
WriteAheadFlowFileRepository();
-        repo2.initialize(new StandardContentClaimManager());
+        repo2.initialize(new StandardResourceClaimManager());
         repo2.loadFlowFiles(queueProvider, 0L);
 
         assertEquals(1, flowFileCollection.size());

Reply via email to