http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 1171636..0a9acc4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -23,14 +23,11 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
-import java.nio.file.StandardCopyOption;
 import java.nio.file.StandardOpenOption;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
@@ -61,18 +58,21 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
-import org.apache.nifi.controller.repository.io.SyncOnCloseOutputStream;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.LongHolder;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.StopWatch;
-
-import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.util.file.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,8 +92,11 @@ public class FileSystemRepository implements 
ContentRepository {
     private final AtomicLong index;
 
     private final ScheduledExecutorService executor = new FlowEngine(4, 
"FileSystemRepository Workers", true);
-    private final ConcurrentMap<String, BlockingQueue<ContentClaim>> 
reclaimable = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> 
reclaimable = new ConcurrentHashMap<>();
     private final Map<String, ContainerState> containerStateMap = new 
HashMap<>();
+    private final long maxAppendClaimLength = 1024L * 1024L; // 1 MB
+    private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new 
LinkedBlockingQueue<>(100);
+    private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> 
writableClaimStreams = new ConcurrentHashMap<>(100);
 
     private final boolean archiveData;
     private final long maxArchiveMillis;
@@ -101,7 +104,7 @@ public class FileSystemRepository implements 
ContentRepository {
     private final boolean alwaysSync;
     private final ScheduledExecutorService containerCleanupExecutor;
 
-    private ContentClaimManager contentClaimManager; // effectively final
+    private ResourceClaimManager resourceClaimManager; // effectively final
 
     // Map of contianer to archived files that should be deleted next.
     private final Map<String, BlockingQueue<ArchiveInfo>> archivedFiles = new 
HashMap<>();
@@ -113,7 +116,7 @@ public class FileSystemRepository implements 
ContentRepository {
         final NiFiProperties properties = NiFiProperties.getInstance();
         // determine the file repository paths and ensure they exist
         final Map<String, Path> fileRespositoryPaths = 
properties.getContentRepositoryPaths();
-        for (Path path : fileRespositoryPaths.values()) {
+        for (final Path path : fileRespositoryPaths.values()) {
             Files.createDirectories(path);
         }
 
@@ -122,7 +125,7 @@ public class FileSystemRepository implements 
ContentRepository {
         index = new AtomicLong(0L);
 
         for (final String containerName : containerNames) {
-            reclaimable.put(containerName, new 
LinkedBlockingQueue<ContentClaim>(10000));
+            reclaimable.put(containerName, new 
LinkedBlockingQueue<ResourceClaim>(10000));
             archivedFiles.put(containerName, new 
LinkedBlockingQueue<ArchiveInfo>(100000));
         }
 
@@ -196,8 +199,8 @@ public class FileSystemRepository implements 
ContentRepository {
     }
 
     @Override
-    public void initialize(final ContentClaimManager claimManager) {
-        this.contentClaimManager = claimManager;
+    public void initialize(final ResourceClaimManager claimManager) {
+        this.resourceClaimManager = claimManager;
 
         final NiFiProperties properties = NiFiProperties.getInstance();
 
@@ -231,6 +234,13 @@ public class FileSystemRepository implements 
ContentRepository {
     public void shutdown() {
         executor.shutdown();
         containerCleanupExecutor.shutdown();
+
+        for (final OutputStream out : writableClaimStreams.values()) {
+            try {
+                out.close();
+            } catch (final IOException ioe) {
+            }
+        }
     }
 
     private static double getRatio(final String value) {
@@ -397,8 +407,8 @@ public class FileSystemRepository implements 
ContentRepository {
         final String id = idPath.toFile().getName();
         final String sectionName = sectionPath.toFile().getName();
 
-        final ContentClaim contentClaim = 
contentClaimManager.newContentClaim(containerName, sectionName, id, false);
-        if (contentClaimManager.getClaimantCount(contentClaim) == 0) {
+        final ResourceClaim resourceClaim = 
resourceClaimManager.newResourceClaim(containerName, sectionName, id, false);
+        if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) {
             removeIncompleteContent(fileToRemove);
         }
     }
@@ -427,15 +437,21 @@ public class FileSystemRepository implements 
ContentRepository {
     }
 
     private Path getPath(final ContentClaim claim) {
-        final Path containerPath = containers.get(claim.getContainer());
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
+        return getPath(resourceClaim);
+    }
+
+    private Path getPath(final ResourceClaim resourceClaim) {
+        final Path containerPath = 
containers.get(resourceClaim.getContainer());
         if (containerPath == null) {
             return null;
         }
-        return 
containerPath.resolve(claim.getSection()).resolve(claim.getId());
+        return 
containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
     }
 
     private Path getPath(final ContentClaim claim, final boolean verifyExists) 
throws ContentNotFoundException {
-        final Path containerPath = containers.get(claim.getContainer());
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
+        final Path containerPath = 
containers.get(resourceClaim.getContainer());
         if (containerPath == null) {
             if (verifyExists) {
                 throw new ContentNotFoundException(claim);
@@ -445,11 +461,11 @@ public class FileSystemRepository implements 
ContentRepository {
         }
 
         // Create the Path that points to the data
-        Path resolvedPath = 
containerPath.resolve(claim.getSection()).resolve(String.valueOf(claim.getId()));
+        Path resolvedPath = 
containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
 
         // If the data does not exist, create a Path that points to where the 
data would exist in the archive directory.
         if (!Files.exists(resolvedPath)) {
-            resolvedPath = getArchivePath(claim);
+            resolvedPath = getArchivePath(claim.getResourceClaim());
         }
 
         if (verifyExists && !Files.exists(resolvedPath)) {
@@ -460,34 +476,55 @@ public class FileSystemRepository implements 
ContentRepository {
 
     @Override
     public ContentClaim create(final boolean lossTolerant) throws IOException {
-        final long currentIndex = index.incrementAndGet();
-
-        String containerName = null;
-        boolean waitRequired = true;
-        ContainerState containerState = null;
-        for (long containerIndex = currentIndex; containerIndex < currentIndex 
+ containers.size(); containerIndex++) {
-            final long modulatedContainerIndex = containerIndex % 
containers.size();
-            containerName = containerNames.get((int) modulatedContainerIndex);
-
-            containerState = containerStateMap.get(containerName);
-            if (!containerState.isWaitRequired()) {
-                waitRequired = false;
-                break;
-            }
-        }
+        ResourceClaim resourceClaim;
+
+        // We need to synchronize on this queue because the act of pulling 
something off
+        // the queue and incrementing the associated claimant count MUST be 
done atomically.
+        // This way, if the claimant count is decremented to 0, we can ensure 
that the
+        // claim is not then pulled from the queue and used as another thread 
is destroying/archiving
+        // the claim.
+        final long resourceOffset;
+        synchronized (writableClaimQueue) {
+            final ClaimLengthPair pair = writableClaimQueue.poll();
+            if (pair == null) {
+                final long currentIndex = index.incrementAndGet();
+
+                String containerName = null;
+                boolean waitRequired = true;
+                ContainerState containerState = null;
+                for (long containerIndex = currentIndex; containerIndex < 
currentIndex + containers.size(); containerIndex++) {
+                    final long modulatedContainerIndex = containerIndex % 
containers.size();
+                    containerName = containerNames.get((int) 
modulatedContainerIndex);
+
+                    containerState = containerStateMap.get(containerName);
+                    if (!containerState.isWaitRequired()) {
+                        waitRequired = false;
+                        break;
+                    }
+                }
 
-        if (waitRequired) {
-            containerState.waitForArchiveExpiration();
-        }
+                if (waitRequired) {
+                    containerState.waitForArchiveExpiration();
+                }
 
-        final long modulatedSectionIndex = currentIndex % 
SECTIONS_PER_CONTAINER;
-        final String section = String.valueOf(modulatedSectionIndex);
-        final String claimId = System.currentTimeMillis() + "-" + currentIndex;
+                final long modulatedSectionIndex = currentIndex % 
SECTIONS_PER_CONTAINER;
+                final String section = String.valueOf(modulatedSectionIndex);
+                final String claimId = System.currentTimeMillis() + "-" + 
currentIndex;
 
-        final ContentClaim claim = 
contentClaimManager.newContentClaim(containerName, section, claimId, 
lossTolerant);
-        contentClaimManager.incrementClaimantCount(claim, true);
+                resourceClaim = 
resourceClaimManager.newResourceClaim(containerName, section, claimId, 
lossTolerant);
+                resourceOffset = 0L;
+                LOG.debug("Creating new Resource Claim {}", resourceClaim);
+            } else {
+                resourceClaim = pair.getClaim();
+                resourceOffset = pair.getLength();
+                LOG.debug("Reusing Resource Claim {}", resourceClaim);
+            }
+
+            resourceClaimManager.incrementClaimantCount(resourceClaim, true);
+        }
 
-        return claim;
+        final StandardContentClaim scc = new 
StandardContentClaim(resourceClaim, resourceOffset);
+        return scc;
     }
 
     @Override
@@ -496,7 +533,7 @@ public class FileSystemRepository implements 
ContentRepository {
             return 0;
         }
 
-        return contentClaimManager.incrementClaimantCount(claim);
+        return 
resourceClaimManager.incrementClaimantCount(claim.getResourceClaim());
     }
 
     @Override
@@ -504,7 +541,7 @@ public class FileSystemRepository implements 
ContentRepository {
         if (claim == null) {
             return 0;
         }
-        return contentClaimManager.getClaimantCount(claim);
+        return resourceClaimManager.getClaimantCount(claim.getResourceClaim());
     }
 
     @Override
@@ -513,7 +550,7 @@ public class FileSystemRepository implements 
ContentRepository {
             return 0;
         }
 
-        final int claimantCount = 
contentClaimManager.decrementClaimantCount(claim);
+        final int claimantCount = 
resourceClaimManager.decrementClaimantCount(claim.getResourceClaim());
         return claimantCount;
     }
 
@@ -523,9 +560,30 @@ public class FileSystemRepository implements 
ContentRepository {
             return false;
         }
 
+        return remove(claim.getResourceClaim());
+    }
+
+    private boolean remove(final ResourceClaim claim) {
+        if (claim == null) {
+            return false;
+        }
+
+        // we synchronize on the queue here because if the claimant count is 0,
+        // we need to be able to remove any instance of that resource claim 
from the
+        // queue atomically (i.e., the checking of the claimant count plus 
removal from the queue
+        // must be atomic)
+        synchronized (writableClaimQueue) {
+            final int claimantCount = 
resourceClaimManager.getClaimantCount(claim);
+            if (claimantCount > 0 || writableClaimQueue.contains(new 
ClaimLengthPair(claim, null))) {
+                // if other content claims are claiming the same resource, we 
have nothing to destroy,
+                // so just consider the destruction successful.
+                return true;
+            }
+        }
+
         Path path = null;
         try {
-            path = getPath(claim, false);
+            path = getPath(claim);
         } catch (final ContentNotFoundException cnfe) {
         }
 
@@ -538,6 +596,7 @@ public class FileSystemRepository implements 
ContentRepository {
         return true;
     }
 
+
     @Override
     public ContentClaim clone(final ContentClaim original, final boolean 
lossTolerant) throws IOException {
         if (original == null) {
@@ -545,14 +604,11 @@ public class FileSystemRepository implements 
ContentRepository {
         }
 
         final ContentClaim newClaim = create(lossTolerant);
-        final Path currPath = getPath(original, true);
-        final Path newPath = getPath(newClaim);
-        try (final FileOutputStream fos = new 
FileOutputStream(newPath.toFile())) {
-            Files.copy(currPath, fos);
-            if (alwaysSync) {
-                fos.getFD().sync();
-            }
+        try (final InputStream in = read(original);
+            final OutputStream out = write(newClaim)) {
+            StreamUtils.copy(in, out);
         } catch (final IOException ioe) {
+            decrementClaimantCount(newClaim);
             remove(newClaim);
             throw ioe;
         }
@@ -564,44 +620,28 @@ public class FileSystemRepository implements 
ContentRepository {
         if (claims.contains(destination)) {
             throw new IllegalArgumentException("destination cannot be within 
claims");
         }
-        try (final FileChannel dest = FileChannel.open(getPath(destination), 
StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
-            long position = 0L;
-            if (header != null && header.length > 0) {
-                final ByteBuffer buffer = ByteBuffer.wrap(header);
-                while (buffer.hasRemaining()) {
-                    position += dest.write(buffer, position);
-                }
+
+        try (final ByteCountingOutputStream out = new 
ByteCountingOutputStream(write(destination))) {
+            if (header != null) {
+                out.write(header);
             }
-            int objectIndex = 0;
+
+            int i = 0;
             for (final ContentClaim claim : claims) {
-                long totalCopied = 0L;
-                try (final FileChannel src = FileChannel.open(getPath(claim, 
true), StandardOpenOption.READ)) {
-                    while (totalCopied < src.size()) {
-                        final long copiedThisIteration = 
dest.transferFrom(src, position, Long.MAX_VALUE);
-                        totalCopied += copiedThisIteration;
-                        position += copiedThisIteration;
-                    }
+                try (final InputStream in = read(claim)) {
+                    StreamUtils.copy(in, out);
                 }
-                // don't add demarcator after the last claim
-                if (demarcator != null && demarcator.length > 0 && 
(++objectIndex < claims.size())) {
-                    final ByteBuffer buffer = ByteBuffer.wrap(demarcator);
-                    while (buffer.hasRemaining()) {
-                        position += dest.write(buffer, position);
-                    }
-                }
-            }
-            if (footer != null && footer.length > 0) {
-                final ByteBuffer buffer = ByteBuffer.wrap(footer);
-                while (buffer.hasRemaining()) {
-                    position += dest.write(buffer, position);
+
+                if (++i < claims.size() && demarcator != null) {
+                    out.write(demarcator);
                 }
             }
 
-            if (alwaysSync) {
-                dest.force(true);
+            if (footer != null) {
+                out.write(footer);
             }
 
-            return position;
+            return out.getBytesWritten();
         }
     }
 
@@ -624,12 +664,8 @@ public class FileSystemRepository implements 
ContentRepository {
 
     @Override
     public long importFrom(final InputStream content, final ContentClaim 
claim, final boolean append) throws IOException {
-        try (final FileOutputStream out = new 
FileOutputStream(getPath(claim).toFile(), append)) {
-            final long copied = StreamUtils.copy(content, out);
-            if (alwaysSync) {
-                out.getFD().sync();
-            }
-            return copied;
+        try (final OutputStream out = write(claim, append)) {
+            return StreamUtils.copy(content, out);
         }
     }
 
@@ -642,20 +678,14 @@ public class FileSystemRepository implements 
ContentRepository {
             Files.createFile(destination);
             return 0L;
         }
-        if (append) {
-            try (final FileChannel sourceChannel = 
FileChannel.open(getPath(claim, true), StandardOpenOption.READ);
-                    final FileChannel destinationChannel = 
FileChannel.open(destination, StandardOpenOption.WRITE, 
StandardOpenOption.APPEND)) {
-                long position = destinationChannel.size();
-                final long targetSize = position + sourceChannel.size();
-                while (position < targetSize) {
-                    final long bytesCopied = 
destinationChannel.transferFrom(sourceChannel, position, Long.MAX_VALUE);
-                    position += bytesCopied;
-                }
-                return position;
+
+        try (final InputStream in = read(claim);
+            final FileOutputStream fos = new 
FileOutputStream(destination.toFile(), append)) {
+            final long copied = StreamUtils.copy(in, fos);
+            if (alwaysSync) {
+                fos.getFD().sync();
             }
-        } else {
-            Files.copy(getPath(claim, true), destination, 
StandardCopyOption.REPLACE_EXISTING);
-            return Files.size(destination);
+            return copied;
         }
     }
 
@@ -674,28 +704,20 @@ public class FileSystemRepository implements 
ContentRepository {
 
         final long claimSize = size(claim);
         if (offset > claimSize) {
-            throw new IllegalArgumentException("offset of " + offset + " 
exceeds claim size of " + claimSize);
+            throw new IllegalArgumentException("Offset of " + offset + " 
exceeds claim size of " + claimSize);
 
         }
 
-        if (append) {
-            try (final InputStream sourceStream = 
Files.newInputStream(getPath(claim, true), StandardOpenOption.READ);
-                    final OutputStream destinationStream = 
Files.newOutputStream(destination, StandardOpenOption.WRITE, 
StandardOpenOption.APPEND)) {
-                StreamUtils.skip(sourceStream, offset);
-
-                final byte[] buffer = new byte[8192];
-                int len;
-                long copied = 0L;
-                while ((len = sourceStream.read(buffer, 0, (int) 
Math.min(length - copied, buffer.length))) > 0) {
-                    destinationStream.write(buffer, 0, len);
-                    copied += len;
-                }
-                return copied;
+        try (final InputStream in = read(claim);
+            final FileOutputStream fos = new 
FileOutputStream(destination.toFile(), append)) {
+            if (offset > 0) {
+                StreamUtils.skip(in, offset);
             }
-        } else {
-            try (final OutputStream out = Files.newOutputStream(destination, 
StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
-                return exportTo(claim, out, offset, length);
+            StreamUtils.copy(in, fos, length);
+            if (alwaysSync) {
+                fos.getFD().sync();
             }
+            return length;
         }
     }
 
@@ -704,7 +726,10 @@ public class FileSystemRepository implements 
ContentRepository {
         if (claim == null) {
             return 0L;
         }
-        return Files.copy(getPath(claim, true), destination);
+
+        try (final InputStream in = read(claim)) {
+            return StreamUtils.copy(in, destination);
+        }
     }
 
     @Override
@@ -719,7 +744,7 @@ public class FileSystemRepository implements 
ContentRepository {
         if (offset == 0 && length == claimSize) {
             return exportTo(claim, destination);
         }
-        try (final InputStream in = Files.newInputStream(getPath(claim, 
true))) {
+        try (final InputStream in = read(claim)) {
             StreamUtils.skip(in, offset);
             final byte[] buffer = new byte[8192];
             int len;
@@ -738,7 +763,12 @@ public class FileSystemRepository implements 
ContentRepository {
             return 0L;
         }
 
-        return Files.size(getPath(claim, true));
+        // see javadocs for claim.getLength() as to why we do this.
+        if (claim.getLength() < 0) {
+            return Files.size(getPath(claim, true)) - claim.getOffset();
+        }
+
+        return claim.getLength();
     }
 
     @Override
@@ -747,16 +777,194 @@ public class FileSystemRepository implements 
ContentRepository {
             return new ByteArrayInputStream(new byte[0]);
         }
         final Path path = getPath(claim, true);
-        return new FileInputStream(path.toFile());
+        final FileInputStream fis = new FileInputStream(path.toFile());
+        if (claim.getOffset() > 0L) {
+            StreamUtils.skip(fis, claim.getOffset());
+        }
+
+        // see javadocs for claim.getLength() as to why we do this.
+        if (claim.getLength() >= 0) {
+            return new LimitedInputStream(fis, claim.getLength());
+        } else {
+            return fis;
+        }
     }
 
     @Override
-    @SuppressWarnings("resource")
     public OutputStream write(final ContentClaim claim) throws IOException {
-        final FileOutputStream fos = new 
FileOutputStream(getPath(claim).toFile());
-        return alwaysSync ? new SyncOnCloseOutputStream(fos) : fos;
+        return write(claim, false);
     }
 
+    private OutputStream write(final ContentClaim claim, final boolean append) 
throws IOException {
+        if (claim == null) {
+            throw new NullPointerException("ContentClaim cannot be null");
+        }
+
+        if (!(claim instanceof StandardContentClaim)) {
+            // we know that we will only create Content Claims that are of 
type StandardContentClaim, so if we get anything
+            // else, just throw an Exception because it is not valid for this 
Repository
+            throw new IllegalArgumentException("Cannot write to " + claim + " 
because that Content Claim does belong to this Content Repository");
+        }
+
+        final StandardContentClaim scc = (StandardContentClaim) claim;
+        if (claim.getLength() > 0) {
+            throw new IllegalArgumentException("Cannot write to " + claim + " 
because it has already been written to.");
+        }
+
+        // we always append because there may be another ContentClaim using 
the same resource claim.
+        // However, we know that we will never write to the same claim from 
two different threads
+        // at the same time because we will call create() to get the claim 
before we write to it,
+        // and when we call create(), it will remove it from the Queue, which 
means that no other
+        // thread will get the same Claim until we've finished writing to it.
+        ByteCountingOutputStream claimStream = 
writableClaimStreams.remove(scc.getResourceClaim());
+        final long initialLength;
+        if (claimStream == null) {
+            final File file = getPath(scc).toFile();
+            // use a synchronized stream because we want to pass this 
OutputStream out from one thread to another.
+            claimStream = new SynchronizedByteCountingOutputStream(new 
FileOutputStream(file, true), file.length());
+            initialLength = 0L;
+        } else {
+            if (append) {
+                initialLength = Math.max(0, scc.getLength());
+            } else {
+                initialLength = 0;
+            }
+        }
+
+        final ByteCountingOutputStream bcos = claimStream;
+        final OutputStream out = new OutputStream() {
+            private long bytesWritten = 0L;
+            private boolean recycle = true;
+            private boolean closed = false;
+
+            @Override
+            public String toString() {
+                return "FileSystemRepository Stream [" + scc + "]";
+            }
+
+            @Override
+            public synchronized void write(final int b) throws IOException {
+                if (closed) {
+                    throw new IOException("Stream is closed");
+                }
+
+                try {
+                    bcos.write(b);
+                } catch (final IOException ioe) {
+                    recycle = false;
+                    throw new IOException("Failed to write to " + this, ioe);
+                }
+
+                bytesWritten++;
+                scc.setLength(bytesWritten + initialLength);
+            }
+
+            @Override
+            public synchronized void write(final byte[] b) throws IOException {
+                if (closed) {
+                    throw new IOException("Stream is closed");
+                }
+
+                try {
+                    bcos.write(b);
+                } catch (final IOException ioe) {
+                    recycle = false;
+                    throw new IOException("Failed to write to " + this, ioe);
+                }
+
+                bytesWritten += b.length;
+                scc.setLength(bytesWritten + initialLength);
+            }
+
+            @Override
+            public synchronized void write(final byte[] b, final int off, 
final int len) throws IOException {
+                if (closed) {
+                    throw new IOException("Stream is closed");
+                }
+
+                try {
+                    bcos.write(b, off, len);
+                } catch (final IOException ioe) {
+                    recycle = false;
+                    throw new IOException("Failed to write to " + this, ioe);
+                }
+
+                bytesWritten += len;
+                scc.setLength(bytesWritten + initialLength);
+            }
+
+            @Override
+            public synchronized void flush() throws IOException {
+                if (closed) {
+                    throw new IOException("Stream is closed");
+                }
+
+                bcos.flush();
+            }
+
+            @Override
+            public synchronized void close() throws IOException {
+                closed = true;
+
+                if (alwaysSync) {
+                    ((FileOutputStream) 
bcos.getWrappedStream()).getFD().sync();
+                }
+
+                if (scc.getLength() < 0) {
+                    // If claim was not written to, set length to 0
+                    scc.setLength(0L);
+                }
+
+                // if we've not yet hit the threshold for appending to a 
resource claim, add the claim
+                // to the writableClaimQueue so that the Resource Claim can be 
used again when create()
+                // is called. In this case, we don't have to actually close 
the file stream. Instead, we
+                // can just add it onto the queue and continue to use it for 
the next content claim.
+                final long resourceClaimLength = scc.getOffset() + 
scc.getLength();
+                if (recycle && resourceClaimLength < maxAppendClaimLength) {
+                    // we do not have to synchronize on the writable claim 
queue here because we
+                    // are only adding something to the queue. We must 
synchronize if we are
+                    // using a ResourceClaim from the queue and incrementing 
the claimant count on that resource
+                    // because those need to be done atomically, or if we are 
destroying a claim that is on
+                    // the queue because we need to ensure that the latter 
operation does not cause problems
+                    // with the former.
+                    final ClaimLengthPair pair = new 
ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength);
+                    final boolean enqueued = writableClaimQueue.offer(pair);
+
+                    if (enqueued) {
+                        writableClaimStreams.put(scc.getResourceClaim(), bcos);
+                        LOG.debug("Claim length less than max; Adding {} back 
to writableClaimStreams", this);
+                    } else {
+                        bcos.close();
+
+                        LOG.debug("Claim length less than max; Closing {} 
because could not add back to queue", this);
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Stack trace: ", new 
RuntimeException("Stack Trace for closing " + this));
+                        }
+                    }
+                } else {
+                    // we've reached the limit for this claim. Don't add it 
back to our queue.
+                    // Instead, just remove it and move on.
+
+                    // ensure that the claim is no longer on the queue
+                    writableClaimQueue.remove(new 
ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength));
+                    bcos.close();
+                    LOG.debug("Claim lenth >= max; Closing {}", this);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Stack trace: ", new RuntimeException("Stack 
Trace for closing " + this));
+                    }
+                }
+            }
+        };
+
+        LOG.debug("Writing to {}", out);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for 
writing to " + out));
+        }
+
+        return out;
+    }
+
+
     @Override
     public void purge() {
         // delete all content from repositories
@@ -788,7 +996,7 @@ public class FileSystemRepository implements 
ContentRepository {
             }
         }
 
-        contentClaimManager.purge();
+        resourceClaimManager.purge();
     }
 
     private class BinDestructableClaims implements Runnable {
@@ -800,17 +1008,17 @@ public class FileSystemRepository implements 
ContentRepository {
                 // because the Container generally maps to a physical 
partition on the disk, so we want a few
                 // different threads hitting the different partitions but 
don't want multiple threads hitting
                 // the same partition.
-                final List<ContentClaim> toDestroy = new ArrayList<>();
+                final List<ResourceClaim> toDestroy = new ArrayList<>();
                 while (true) {
                     toDestroy.clear();
-                    contentClaimManager.drainDestructableClaims(toDestroy, 
10000);
+                    resourceClaimManager.drainDestructableClaims(toDestroy, 
10000);
                     if (toDestroy.isEmpty()) {
                         return;
                     }
 
-                    for (final ContentClaim claim : toDestroy) {
+                    for (final ResourceClaim claim : toDestroy) {
                         final String container = claim.getContainer();
-                        final BlockingQueue<ContentClaim> claimQueue = 
reclaimable.get(container);
+                        final BlockingQueue<ResourceClaim> claimQueue = 
reclaimable.get(container);
 
                         try {
                             while (true) {
@@ -838,7 +1046,7 @@ public class FileSystemRepository implements 
ContentRepository {
         return sectionPath.resolve(ARCHIVE_DIR_NAME).resolve(claimId);
     }
 
-    private Path getArchivePath(final ContentClaim claim) {
+    private Path getArchivePath(final ResourceClaim claim) {
         final String claimId = claim.getId();
         final Path containerPath = containers.get(claim.getContainer());
         final Path archivePath = 
containerPath.resolve(claim.getSection()).resolve(ARCHIVE_DIR_NAME).resolve(claimId);
@@ -859,22 +1067,28 @@ public class FileSystemRepository implements 
ContentRepository {
             return true;
         }
 
-        return Files.exists(getArchivePath(contentClaim));
+        return Files.exists(getArchivePath(contentClaim.getResourceClaim()));
     }
 
-    private void archive(final ContentClaim contentClaim) throws IOException {
+    private void archive(final ResourceClaim claim) throws IOException {
         if (!archiveData) {
             return;
         }
 
-        final int claimantCount = getClaimantCount(contentClaim);
-        if (claimantCount > 0) {
-            throw new IllegalStateException("Cannot archive ContentClaim " + 
contentClaim + " because it is currently in use");
+        synchronized (writableClaimQueue) {
+            final int claimantCount = claim == null ? 0 : 
resourceClaimManager.getClaimantCount(claim);
+            if (claimantCount > 0 || writableClaimQueue.contains(new 
ClaimLengthPair(claim, null))) {
+                return;
+            }
+        }
+
+        final Path curPath = getPath(claim);
+        if (curPath == null) {
+            return;
         }
 
-        final Path curPath = getPath(contentClaim, true);
         archive(curPath);
-        LOG.debug("Successfully moved {} to archive", contentClaim);
+        LOG.debug("Successfully moved {} to archive", claim);
     }
 
     private void archive(final Path curPath) throws IOException {
@@ -1107,8 +1321,8 @@ public class FileSystemRepository implements 
ContentRepository {
                 while (true) {
                     // look through each of the binned queues of Content Claims
                     int successCount = 0;
-                    final List<ContentClaim> toRemove = new ArrayList<>();
-                    for (final Map.Entry<String, BlockingQueue<ContentClaim>> 
entry : reclaimable.entrySet()) {
+                    final List<ResourceClaim> toRemove = new ArrayList<>();
+                    for (final Map.Entry<String, BlockingQueue<ResourceClaim>> 
entry : reclaimable.entrySet()) {
                         // drain the queue of all ContentClaims that can be 
destroyed for the given container.
                         final String container = entry.getKey();
                         final ContainerState containerState = 
containerStateMap.get(container);
@@ -1121,7 +1335,7 @@ public class FileSystemRepository implements 
ContentRepository {
 
                         // destroy each claim for this container
                         final long start = System.nanoTime();
-                        for (final ContentClaim claim : toRemove) {
+                        for (final ResourceClaim claim : toRemove) {
                             if (archiveData) {
                                 try {
                                     archive(claim);
@@ -1210,7 +1424,7 @@ public class FileSystemRepository implements 
ContentRepository {
         @Override
         public void run() {
             try {
-                if (oldestArchiveDate.get() > (System.currentTimeMillis() - 
maxArchiveMillis)) {
+                if (oldestArchiveDate.get() > System.currentTimeMillis() - 
maxArchiveMillis) {
                     final Long minRequiredSpace = 
minUsableContainerBytesForArchive.get(containerName);
                     if (minRequiredSpace == null) {
                         return;
@@ -1245,7 +1459,7 @@ public class FileSystemRepository implements 
ContentRepository {
                 if (oldestContainerArchive < 0L) {
                     boolean updated;
                     do {
-                        long oldest = oldestArchiveDate.get();
+                        final long oldest = oldestArchiveDate.get();
                         if (oldestContainerArchive < oldest) {
                             updated = oldestArchiveDate.compareAndSet(oldest, 
oldestContainerArchive);
 
@@ -1298,7 +1512,7 @@ public class FileSystemRepository implements 
ContentRepository {
                     final long free = getContainerUsableSpace(containerName);
                     used = capacity - free;
                     bytesUsed = used;
-                } catch (IOException e) {
+                } catch (final IOException e) {
                     return false;
                 }
             }
@@ -1317,7 +1531,7 @@ public class FileSystemRepository implements 
ContentRepository {
                     try {
                         LOG.info("Unable to write to container {} due to 
archive file size constraints; waiting for archive cleanup", containerName);
                         condition.await();
-                    } catch (InterruptedException e) {
+                    } catch (final InterruptedException e) {
                     }
                 }
             } finally {
@@ -1355,4 +1569,55 @@ public class FileSystemRepository implements 
ContentRepository {
         }
     }
 
+
+    private static class ClaimLengthPair {
+        private final ResourceClaim claim;
+        private final Long length;
+
+        public ClaimLengthPair(final ResourceClaim claim, final Long length) {
+            this.claim = claim;
+            this.length = length;
+        }
+
+        public ResourceClaim getClaim() {
+            return claim;
+        }
+
+        public Long getLength() {
+            return length;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (claim == null ? 0 : claim.hashCode());
+            return result;
+        }
+
+        /**
+         * Equality is determined purely by the ResourceClaim's equality
+         *
+         * @param obj the object to compare against
+         * @return -1, 0, or +1 according to the contract of Object.equals
+         */
+        @Override
+        public boolean equals(final Object obj) {
+            if (this == obj) {
+                return true;
+            }
+
+            if (obj == null) {
+                return false;
+            }
+
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+
+            final ClaimLengthPair other = (ClaimLengthPair) obj;
+            return claim.equals(other.getClaim());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
index 6524cd3..cc8c734 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
@@ -95,7 +95,7 @@ public final class StandardFlowFileRecord implements 
FlowFile, FlowFileRecord {
 
     @Override
     public boolean isPenalized() {
-        return (penaltyExpirationMs > 0) ? penaltyExpirationMs > 
System.currentTimeMillis() : false;
+        return penaltyExpirationMs > 0 ? penaltyExpirationMs > 
System.currentTimeMillis() : false;
     }
 
     @Override
@@ -150,7 +150,7 @@ public final class StandardFlowFileRecord implements 
FlowFile, FlowFileRecord {
     public String toString() {
         final ToStringBuilder builder = new ToStringBuilder(this, 
ToStringStyle.SHORT_PREFIX_STYLE);
         builder.append("uuid", getAttribute(CoreAttributes.UUID.key()));
-        builder.append("claim", claim == null ? "" : claim.getId());
+        builder.append("claim", claim == null ? "" : claim.toString());
         builder.append("offset", claimOffset);
         builder.append("name", 
getAttribute(CoreAttributes.FILENAME.key())).append("size", size);
         return builder.toString();
@@ -169,7 +169,7 @@ public final class StandardFlowFileRecord implements 
FlowFile, FlowFileRecord {
         private final Set<String> bLineageIdentifiers = new HashSet<>();
         private long bPenaltyExpirationMs = -1L;
         private long bSize = 0L;
-        private Map<String, String> bAttributes = new HashMap<>();
+        private final Map<String, String> bAttributes = new HashMap<>();
         private ContentClaim bClaim = null;
         private long bClaimOffset = 0L;
         private long bLastQueueDate = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 210b620..0e08325 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -44,6 +44,7 @@ import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.io.ByteCountingInputStream;
 import org.apache.nifi.controller.repository.io.ByteCountingOutputStream;
 import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream;
@@ -54,10 +55,6 @@ import 
org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.controller.repository.io.LongHolder;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.NonCloseableInputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.QueueSize;
@@ -75,7 +72,8 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,10 +92,6 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     // determines how many things must be transferred, removed, modified in 
order to avoid logging the FlowFile ID's on commit/rollback
     public static final int VERBOSE_LOG_THRESHOLD = 10;
-    private static final long MAX_APPENDABLE_CLAIM_SIZE = 
DataUnit.parseDataSize(
-            NiFiProperties.getInstance().getMaxAppendableClaimSize(), 
DataUnit.B).longValue();
-    private static final int MAX_FLOWFILES_PER_CLAIM = 
NiFiProperties.getInstance().getMaxFlowFilesPerClaim();
-
     public static final String DEFAULT_FLOWFILE_PATH = "./";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StandardProcessSession.class);
@@ -126,12 +120,6 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     private final LongHolder bytesWritten = new LongHolder(0L);
     private int flowFilesIn = 0, flowFilesOut = 0;
     private long contentSizeIn = 0L, contentSizeOut = 0L;
-    private int writeRecursionLevel = 0;
-
-    private ContentClaim currentWriteClaim = null;
-    private OutputStream currentWriteClaimStream = null;
-    private long currentWriteClaimSize = 0L;
-    private int currentWriteClaimFlowFileCount = 0;
 
     private ContentClaim currentReadClaim = null;
     private ByteCountingInputStream currentReadClaimStream = null;
@@ -706,12 +694,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         if (originalClaim == null) {
             builder.setCurrentContentClaim(null, null, null, null, 0L);
         } else {
+            final ResourceClaim resourceClaim = 
originalClaim.getResourceClaim();
             builder.setCurrentContentClaim(
-                    originalClaim.getContainer(), originalClaim.getSection(), 
originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), 
repoRecord.getOriginal().getSize()
-            );
+                resourceClaim.getContainer(), resourceClaim.getSection(), 
resourceClaim.getId(),
+                repoRecord.getOriginal().getContentClaimOffset() + 
originalClaim.getOffset(), repoRecord.getOriginal().getSize());
             builder.setPreviousContentClaim(
-                    originalClaim.getContainer(), originalClaim.getSection(), 
originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), 
repoRecord.getOriginal().getSize()
-            );
+                resourceClaim.getContainer(), resourceClaim.getSection(), 
resourceClaim.getId(),
+                repoRecord.getOriginal().getContentClaimOffset() + 
originalClaim.getOffset(), repoRecord.getOriginal().getSize());
         }
     }
 
@@ -727,14 +716,18 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             final ContentClaim currentClaim = repoRecord.getCurrentClaim();
             final long currentOffset = repoRecord.getCurrentClaimOffset();
             final long size = flowFile.getSize();
-            recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), 
currentClaim.getSection(), currentClaim.getId(), currentOffset, size);
+
+            final ResourceClaim resourceClaim = 
currentClaim.getResourceClaim();
+            recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), 
resourceClaim.getSection(), resourceClaim.getId(), currentOffset + 
currentClaim.getOffset(), size);
         }
 
         if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() 
!= null) {
             final ContentClaim originalClaim = repoRecord.getOriginalClaim();
             final long originalOffset = 
repoRecord.getOriginal().getContentClaimOffset();
             final long originalSize = repoRecord.getOriginal().getSize();
-            
recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), 
originalClaim.getSection(), originalClaim.getId(), originalOffset, 
originalSize);
+
+            final ResourceClaim resourceClaim = 
originalClaim.getResourceClaim();
+            
recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), 
resourceClaim.getSection(), resourceClaim.getId(), originalOffset + 
originalClaim.getOffset(), originalSize);
         }
 
         final FlowFileQueue originalQueue = repoRecord.getOriginalQueue();
@@ -757,14 +750,18 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 final ContentClaim currentClaim = repoRecord.getCurrentClaim();
                 final long currentOffset = repoRecord.getCurrentClaimOffset();
                 final long size = eventFlowFile.getSize();
-                
recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), 
currentClaim.getSection(), currentClaim.getId(), currentOffset, size);
+
+                final ResourceClaim resourceClaim = 
currentClaim.getResourceClaim();
+                
recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), 
resourceClaim.getSection(), resourceClaim.getId(), currentOffset + 
currentClaim.getOffset(), size);
             }
 
             if (repoRecord.getOriginal() != null && 
repoRecord.getOriginalClaim() != null) {
                 final ContentClaim originalClaim = 
repoRecord.getOriginalClaim();
                 final long originalOffset = 
repoRecord.getOriginal().getContentClaimOffset();
                 final long originalSize = repoRecord.getOriginal().getSize();
-                
recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), 
originalClaim.getSection(), originalClaim.getId(), originalOffset, 
originalSize);
+
+                final ResourceClaim resourceClaim = 
originalClaim.getResourceClaim();
+                
recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), 
resourceClaim.getSection(), resourceClaim.getId(), originalOffset + 
originalClaim.getOffset(), originalSize);
             }
 
             final FlowFileQueue originalQueue = repoRecord.getOriginalQueue();
@@ -1670,8 +1667,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
                             final ContentClaim claim = 
record.getContentClaim();
                             if (claim != null) {
-                                
enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), 
claim.getId(), record.getContentClaimOffset(), record.getSize());
-                                
enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), 
claim.getId(), record.getContentClaimOffset(), record.getSize());
+                                final ResourceClaim resourceClaim = 
claim.getResourceClaim();
+                                
enriched.setCurrentContentClaim(resourceClaim.getContainer(), 
resourceClaim.getSection(), resourceClaim.getId(),
+                                    record.getContentClaimOffset() + 
claim.getOffset(), record.getSize());
+                                
enriched.setPreviousContentClaim(resourceClaim.getContainer(), 
resourceClaim.getSection(), resourceClaim.getId(),
+                                    record.getContentClaimOffset() + 
claim.getOffset(), record.getSize());
                             }
 
                             enriched.setAttributes(record.getAttributes(), 
Collections.<String, String>emptyMap());
@@ -1715,7 +1715,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                             StreamUtils.skip(currentReadClaimStream, 
bytesToSkip);
                         }
 
-                        return new 
NonCloseableInputStream(currentReadClaimStream);
+                        return new 
DisableOnCloseInputStream(currentReadClaimStream);
                     }
                 }
 
@@ -1731,7 +1731,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
                 // Use a non-closeable stream because we want to keep it open 
after the callback has finished so that we can
                 // reuse the same InputStream for the next FlowFile
-                return new NonCloseableInputStream(currentReadClaimStream);
+                return new DisableOnCloseInputStream(currentReadClaimStream);
             } else {
                 final InputStream rawInStream = 
context.getContentRepository().read(claim);
                 StreamUtils.skip(rawInStream, offset);
@@ -1882,30 +1882,6 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         return newFile;
     }
 
-    private void enforceCurrentWriteClaimState() {
-        if (currentWriteClaimFlowFileCount > MAX_FLOWFILES_PER_CLAIM || 
currentWriteClaimSize > MAX_APPENDABLE_CLAIM_SIZE) {
-            resetWriteClaims();
-        }
-
-        if (currentWriteClaimStream == null) {
-            try {
-                currentWriteClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} to enforce Current 
Write Claim State for {}", currentWriteClaim, context.getConnectable());
-            } catch (final IOException e) {
-                throw new FlowFileHandlingException("Unable to create 
ContentClaim due to " + e.toString(), e);
-            }
-
-            try {
-                currentWriteClaimStream = 
context.getContentRepository().write(currentWriteClaim);
-            } catch (final IOException e) {
-                resetWriteClaims();
-                throw new FlowFileAccessException("Unable to obtain stream for 
writing to Content Repostiory: " + e, e);
-            }
-        } else {
-            
context.getContentRepository().incrementClaimaintCount(currentWriteClaim);
-        }
-    }
-
     private void ensureNotAppending(final ContentClaim claim) throws 
IOException {
         if (claim == null) {
             return;
@@ -1924,79 +1900,36 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     public FlowFile write(final FlowFile source, final OutputStreamCallback 
writer) {
         validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
-        long newSize = 0L;
-        long claimOffset = 0L;
 
         ContentClaim newClaim = null;
         final LongHolder writtenHolder = new LongHolder(0L);
-        final boolean appendToClaim = isMergeContent();
         try {
-            if (appendToClaim) {
-                enforceCurrentWriteClaimState();
-                claimOffset = currentWriteClaimSize;
-                newClaim = currentWriteClaim;
-                ensureNotAppending(newClaim);
-
-                try (final OutputStream disableOnClose = new 
DisableOnCloseOutputStream(currentWriteClaimStream);
-                        final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnClose, writtenHolder)) {
-
-                    recursionSet.add(source);
-
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(new 
FlowFileAccessOutputStream(countingOut, source));
-                    } finally {
-                        writeRecursionLevel--;
-                    }
-                } finally {
-                    recursionSet.remove(source);
-                }
-
-                final long writtenThisCall = writtenHolder.getValue();
-                newSize = writtenThisCall;
-                currentWriteClaimSize += newSize;
-            } else {
-                newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} for 'write' for {}", 
newClaim, source);
+            newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
+            claimLog.debug("Creating ContentClaim {} for 'write' for {}", 
newClaim, source);
 
-                ensureNotAppending(newClaim);
-                try (final OutputStream stream = 
context.getContentRepository().write(newClaim);
-                        final OutputStream countingOut = new 
ByteCountingOutputStream(stream, writtenHolder)) {
-                    recursionSet.add(source);
-
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(new 
FlowFileAccessOutputStream(countingOut, source));
-                    } finally {
-                        writeRecursionLevel--;
-                    }
-                } finally {
-                    recursionSet.remove(source);
-                }
-                newSize = context.getContentRepository().size(newClaim);
+            ensureNotAppending(newClaim);
+            try (final OutputStream stream = 
context.getContentRepository().write(newClaim);
+                final OutputStream disableOnClose = new 
DisableOnCloseOutputStream(stream);
+                final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnClose, writtenHolder)) {
+                recursionSet.add(source);
+                writer.process(new FlowFileAccessOutputStream(countingOut, 
source));
+            } finally {
+                recursionSet.remove(source);
             }
         } catch (final ContentNotFoundException nfe) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            }
+            resetWriteClaims(); // need to reset write claim before we can 
remove the claim
             destroyContent(newClaim);
             handleContentNotFound(nfe, record);
         } catch (final FlowFileAccessException ffae) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            }
+            resetWriteClaims(); // need to reset write claim before we can 
remove the claim
             destroyContent(newClaim);
             throw ffae;
         } catch (final IOException ioe) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            }
+            resetWriteClaims(); // need to reset write claim before we can 
remove the claim
             destroyContent(newClaim);
             throw new ProcessException("IOException thrown from " + 
connectableDescription + ": " + ioe.toString(), ioe);
         } catch (final Throwable t) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            }
+            resetWriteClaims(); // need to reset write claim before we can 
remove the claim
             destroyContent(newClaim);
             throw t;
         } finally {
@@ -2004,7 +1937,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         }
 
         removeTemporaryClaim(record);
-        final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build();
+        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
+            .fromFlowFile(record.getCurrent())
+            .contentClaim(newClaim)
+            .contentClaimOffset(0)
+            .size(writtenHolder.getValue())
+            .build();
+
         record.setWorking(newFile);
         return newFile;
     }
@@ -2041,13 +1980,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                     // wrap our OutputStreams so that the processor cannot 
close it
                     try (final OutputStream disableOnClose = new 
DisableOnCloseOutputStream(outStream)) {
                         recursionSet.add(source);
-
-                        writeRecursionLevel++;
-                        try {
-                            writer.process(new 
FlowFileAccessOutputStream(disableOnClose, source));
-                        } finally {
-                            writeRecursionLevel--;
-                        }
+                        writer.process(new 
FlowFileAccessOutputStream(disableOnClose, source));
                     } finally {
                         recursionSet.remove(source);
                     }
@@ -2059,13 +1992,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 // wrap our OutputStreams so that the processor cannot close it
                 try (final OutputStream disableOnClose = new 
DisableOnCloseOutputStream(outStream)) {
                     recursionSet.add(source);
-
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(disableOnClose);
-                    } finally {
-                        writeRecursionLevel--;
-                    }
+                    writer.process(disableOnClose);
                 } finally {
                     recursionSet.remove(source);
                 }
@@ -2142,25 +2069,6 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     private void resetWriteClaims(final boolean suppressExceptions) {
-        try {
-            if (currentWriteClaimStream != null) {
-                try {
-                    currentWriteClaimStream.flush();
-                } finally {
-                    currentWriteClaimStream.close();
-                }
-            }
-        } catch (final IOException e) {
-            if (!suppressExceptions) {
-                throw new FlowFileAccessException("Unable to flush the output 
of FlowFile to the Content Repository");
-            }
-        }
-
-        currentWriteClaimStream = null;
-        currentWriteClaim = null;
-        currentWriteClaimFlowFileCount = 0;
-        currentWriteClaimSize = 0L;
-
         for (final ByteCountingOutputStream out : appendableStreams.values()) {
             try {
                 try {
@@ -2188,16 +2096,6 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         currentReadClaim = null;
     }
 
-    /**
-     * @return Indicates whether or not multiple FlowFiles should be merged 
into
-     * a single ContentClaim
-     */
-    private boolean isMergeContent() {
-        if (writeRecursionLevel > 0) {
-            return false;
-        }
-        return true;
-    }
 
     @Override
     public FlowFile write(final FlowFile source, final StreamCallback writer) {
@@ -2206,117 +2104,55 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final ContentClaim currClaim = record.getCurrentClaim();
 
         ContentClaim newClaim = null;
-        long newSize = 0L;
-        long claimOffset = 0L;
         final LongHolder writtenHolder = new LongHolder(0L);
-        final boolean appendToClaim = isMergeContent();
         try {
-            if (appendToClaim) {
-                enforceCurrentWriteClaimState();
-                claimOffset = currentWriteClaimSize;
-                newClaim = currentWriteClaim;
-                ensureNotAppending(newClaim);
-
-                try (final InputStream rawIn = getInputStream(source, 
currClaim, record.getCurrentClaimOffset());
-                        final InputStream limitedIn = new 
LimitedInputStream(rawIn, source.getSize());
-                        final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
-                        final InputStream countingIn = new 
ByteCountingInputStream(disableOnCloseIn, bytesRead);
-                        final OutputStream disableOnCloseOut = new 
DisableOnCloseOutputStream(currentWriteClaimStream);
-                        final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
-
-                    recursionSet.add(source);
-
-                    // We want to differentiate between IOExceptions thrown by 
the repository and IOExceptions thrown from
-                    // Processor code. As a result, as have the 
FlowFileAccessInputStream that catches IOException from the repository
-                    // and translates into either FlowFileAccessException or 
ContentNotFoundException. We keep track of any
-                    // ContentNotFoundException because if it is thrown, the 
Processor code may catch it and do something else with it
-                    // but in reality, if it is thrown, we want to know about 
it and handle it, even if the Processor code catches it.
-                    final FlowFileAccessInputStream ffais = new 
FlowFileAccessInputStream(countingIn, source, currClaim);
-                    boolean cnfeThrown = false;
-
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(ffais, new 
FlowFileAccessOutputStream(countingOut, source));
-                    } catch (final ContentNotFoundException cnfe) {
-                        cnfeThrown = true;
-                        throw cnfe;
-                    } finally {
-                        writeRecursionLevel--;
-                        recursionSet.remove(source);
-
-                        // if cnfeThrown is true, we don't need to re-thrown 
the Exception; it will propagate.
-                        if (!cnfeThrown && ffais.getContentNotFoundException() 
!= null) {
-                            throw ffais.getContentNotFoundException();
-                        }
-                    }
-                }
+            newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
+            claimLog.debug("Creating ContentClaim {} for 'write' for {}", 
newClaim, source);
 
-                final long writtenThisCall = writtenHolder.getValue();
-                newSize = writtenThisCall;
-                currentWriteClaimSize += writtenThisCall;
-            } else {
-                newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} for 'write' for {}", 
newClaim, source);
+            ensureNotAppending(newClaim);
 
-                ensureNotAppending(newClaim);
+            try (final InputStream is = getInputStream(source, currClaim, 
record.getCurrentClaimOffset());
+                final InputStream limitedIn = new LimitedInputStream(is, 
source.getSize());
+                final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
+                final InputStream countingIn = new 
ByteCountingInputStream(disableOnCloseIn, bytesRead);
+                final OutputStream os = 
context.getContentRepository().write(newClaim);
+                final OutputStream disableOnCloseOut = new 
DisableOnCloseOutputStream(os);
+                final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
 
-                try (final InputStream is = getInputStream(source, currClaim, 
record.getCurrentClaimOffset());
-                        final InputStream limitedIn = new 
LimitedInputStream(is, source.getSize());
-                        final InputStream countingIn = new 
ByteCountingInputStream(limitedIn, bytesRead);
-                        final OutputStream os = 
context.getContentRepository().write(newClaim);
-                        final OutputStream countingOut = new 
ByteCountingOutputStream(os, writtenHolder)) {
+                recursionSet.add(source);
 
-                    recursionSet.add(source);
+                // We want to differentiate between IOExceptions thrown by the 
repository and IOExceptions thrown from
+                // Processor code. As a result, as have the 
FlowFileAccessInputStream that catches IOException from the repository
+                // and translates into either FlowFileAccessException or 
ContentNotFoundException. We keep track of any
+                // ContentNotFoundException because if it is thrown, the 
Processor code may catch it and do something else with it
+                // but in reality, if it is thrown, we want to know about it 
and handle it, even if the Processor code catches it.
+                final FlowFileAccessInputStream ffais = new 
FlowFileAccessInputStream(countingIn, source, currClaim);
+                boolean cnfeThrown = false;
 
-                    // We want to differentiate between IOExceptions thrown by 
the repository and IOExceptions thrown from
-                    // Processor code. As a result, as have the 
FlowFileAccessInputStream that catches IOException from the repository
-                    // and translates into either FlowFileAccessException or 
ContentNotFoundException. We keep track of any
-                    // ContentNotFoundException because if it is thrown, the 
Processor code may catch it and do something else with it
-                    // but in reality, if it is thrown, we want to know about 
it and handle it, even if the Processor code catches it.
-                    final FlowFileAccessInputStream ffais = new 
FlowFileAccessInputStream(countingIn, source, currClaim);
-                    boolean cnfeThrown = false;
-
-                    writeRecursionLevel++;
-                    try {
-                        writer.process(ffais, new 
FlowFileAccessOutputStream(countingOut, source));
-                    } catch (final ContentNotFoundException cnfe) {
-                        cnfeThrown = true;
-                        throw cnfe;
-                    } finally {
-                        writeRecursionLevel--;
-                        recursionSet.remove(source);
+                try {
+                    writer.process(ffais, new 
FlowFileAccessOutputStream(countingOut, source));
+                } catch (final ContentNotFoundException cnfe) {
+                    cnfeThrown = true;
+                    throw cnfe;
+                } finally {
+                    recursionSet.remove(source);
 
-                        // if cnfeThrown is true, we don't need to re-thrown 
the Exception; it will propagate.
-                        if (!cnfeThrown && ffais.getContentNotFoundException() 
!= null) {
-                            throw ffais.getContentNotFoundException();
-                        }
+                    // if cnfeThrown is true, we don't need to re-thrown the 
Exception; it will propagate.
+                    if (!cnfeThrown && ffais.getContentNotFoundException() != 
null) {
+                        throw ffais.getContentNotFoundException();
                     }
                 }
-
-                newSize = context.getContentRepository().size(newClaim);
             }
         } catch (final ContentNotFoundException nfe) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            }
             destroyContent(newClaim);
             handleContentNotFound(nfe, record);
         } catch (final IOException ioe) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            }
             destroyContent(newClaim);
             throw new ProcessException("IOException thrown from " + 
connectableDescription + ": " + ioe.toString(), ioe);
         } catch (final FlowFileAccessException ffae) {
-            if (appendToClaim) {
-                resetWriteClaims();
-            }
             destroyContent(newClaim);
             throw ffae;
         } catch (final Throwable t) {
-            if (appendToClaim) {
-                resetWriteClaims(); // need to reset write claim before we can 
remove the claim
-            }
             destroyContent(newClaim);
             throw t;
         } finally {
@@ -2324,7 +2160,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         }
 
         removeTemporaryClaim(record);
-        final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build();
+        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
+            .fromFlowFile(record.getCurrent())
+            .contentClaim(newClaim)
+            .contentClaimOffset(0L)
+            .size(writtenHolder.getValue())
+            .build();
+
         record.setWorking(newFile);
         return newFile;
     }
@@ -2343,33 +2185,20 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final ContentClaim newClaim;
         final long claimOffset;
 
-        final boolean appendToClaim = isMergeContent();
-        if (appendToClaim) {
-            enforceCurrentWriteClaimState();
-            newClaim = currentWriteClaim;
-            claimOffset = currentWriteClaimSize;
-        } else {
-            try {
-                newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} for 'importFrom' for 
{}", newClaim, destination);
-            } catch (final IOException e) {
-                throw new FlowFileAccessException("Unable to create 
ContentClaim due to " + e.toString(), e);
-            }
-
-            claimOffset = 0L;
+        try {
+            newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
+            claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", 
newClaim, destination);
+        } catch (final IOException e) {
+            throw new FlowFileAccessException("Unable to create ContentClaim 
due to " + e.toString(), e);
         }
 
+        claimOffset = 0L;
         long newSize = 0L;
         try {
-            final boolean append = isMergeContent();
-            newSize = context.getContentRepository().importFrom(source, 
newClaim, append);
+            newSize = context.getContentRepository().importFrom(source, 
newClaim);
             bytesWritten.increment(newSize);
             bytesRead.increment(newSize);
-            currentWriteClaimSize += newSize;
         } catch (final Throwable t) {
-            if (appendToClaim) {
-                resetWriteClaims();
-            }
             destroyContent(newClaim);
             throw new FlowFileAccessException("Failed to import data from " + 
source + " for " + destination + " due to " + t.toString(), t);
         }
@@ -2392,40 +2221,24 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         validateRecordState(destination);
         final StandardRepositoryRecord record = records.get(destination);
         ContentClaim newClaim = null;
-        long claimOffset = 0L;
+        final long claimOffset = 0L;
 
         final long newSize;
-        final boolean appendToClaim = isMergeContent();
         try {
-            if (appendToClaim) {
-                enforceCurrentWriteClaimState();
-                newClaim = currentWriteClaim;
-                claimOffset = currentWriteClaimSize;
-
-                final long bytesCopied = StreamUtils.copy(source, 
currentWriteClaimStream);
-                bytesWritten.increment(bytesCopied);
-                currentWriteClaimSize += bytesCopied;
-                newSize = bytesCopied;
-            } else {
-                try {
-                    newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                    claimLog.debug("Creating ContentClaim {} for 'importFrom' 
for {}", newClaim, destination);
+            try {
+                newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
+                claimLog.debug("Creating ContentClaim {} for 'importFrom' for 
{}", newClaim, destination);
 
-                    newSize = 
context.getContentRepository().importFrom(source, newClaim, appendToClaim);
-                    bytesWritten.increment(newSize);
-                    currentWriteClaimSize += newSize;
-                } catch (final IOException e) {
-                    throw new FlowFileAccessException("Unable to create 
ContentClaim due to " + e.toString(), e);
-                }
+                newSize = context.getContentRepository().importFrom(source, 
newClaim);
+                bytesWritten.increment(newSize);
+            } catch (final IOException e) {
+                throw new FlowFileAccessException("Unable to create 
ContentClaim due to " + e.toString(), e);
             }
         } catch (final Throwable t) {
-            if (appendToClaim) {
-                resetWriteClaims();
-            }
-
             if (newClaim != null) {
                 destroyContent(newClaim);
             }
+
             throw new FlowFileAccessException("Failed to import data from " + 
source + " for " + destination + " due to " + t.toString(), t);
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index 3bfdd8a..6c1626c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -27,8 +27,10 @@ import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -38,7 +40,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.controller.repository.io.ArrayManagedOutputStream;
 import org.apache.nifi.controller.repository.io.MemoryManager;
@@ -92,7 +95,7 @@ public class VolatileContentRepository implements 
ContentRepository {
     private final ConcurrentMap<ContentClaim, ContentClaim> backupRepoClaimMap 
= new ConcurrentHashMap<>(256);
     private final AtomicReference<ContentRepository> backupRepositoryRef = new 
AtomicReference<>(null);
 
-    private ContentClaimManager claimManager; // effectively final
+    private ResourceClaimManager claimManager; // effectively final
 
     public VolatileContentRepository() {
         this(NiFiProperties.getInstance());
@@ -119,7 +122,7 @@ public class VolatileContentRepository implements 
ContentRepository {
     }
 
     @Override
-    public void initialize(final ContentClaimManager claimManager) {
+    public void initialize(final ResourceClaimManager claimManager) {
         this.claimManager = claimManager;
 
         for (int i = 0; i < 3; i++) {
@@ -199,9 +202,10 @@ public class VolatileContentRepository implements 
ContentRepository {
 
     private ContentClaim createLossTolerant() {
         final long id = idGenerator.getAndIncrement();
-        final ContentClaim claim = 
claimManager.newContentClaim(CONTAINER_NAME, "section", String.valueOf(id), 
true);
+        final ResourceClaim resourceClaim = 
claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), 
true);
+        final ContentClaim claim = new StandardContentClaim(resourceClaim, 0L);
         final ContentBlock contentBlock = new ContentBlock(claim, repoSize);
-        claimManager.incrementClaimantCount(claim, true);
+        claimManager.incrementClaimantCount(resourceClaim, true);
 
         claimMap.put(claim, contentBlock);
 
@@ -216,7 +220,7 @@ public class VolatileContentRepository implements 
ContentRepository {
         }
         final ContentClaim backupClaim = getBackupClaim(claim);
         if (backupClaim == null) {
-            return claimManager.incrementClaimantCount(resolveClaim(claim));
+            return 
claimManager.incrementClaimantCount(resolveClaim(claim).getResourceClaim());
         } else {
             return getBackupRepository().incrementClaimaintCount(backupClaim);
         }
@@ -230,7 +234,7 @@ public class VolatileContentRepository implements 
ContentRepository {
 
         final ContentClaim backupClaim = getBackupClaim(claim);
         if (backupClaim == null) {
-            return claimManager.decrementClaimantCount(resolveClaim(claim));
+            return 
claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim());
         } else {
             return getBackupRepository().decrementClaimantCount(backupClaim);
         }
@@ -244,7 +248,7 @@ public class VolatileContentRepository implements 
ContentRepository {
 
         final ContentClaim backupClaim = getBackupClaim(claim);
         if (backupClaim == null) {
-            return claimManager.getClaimantCount(resolveClaim(claim));
+            return 
claimManager.getClaimantCount(resolveClaim(claim).getResourceClaim());
         } else {
             return getBackupRepository().getClaimantCount(backupClaim);
         }
@@ -273,6 +277,29 @@ public class VolatileContentRepository implements 
ContentRepository {
         return true;
     }
 
+    private boolean remove(final ResourceClaim claim) {
+        if (claim == null) {
+            return false;
+        }
+
+        final Set<ContentClaim> contentClaims = new HashSet<>();
+        for (final Map.Entry<ContentClaim, ContentBlock> entry : 
claimMap.entrySet()) {
+            final ContentClaim contentClaim = entry.getKey();
+            if (contentClaim.getResourceClaim().equals(claim)) {
+                contentClaims.add(contentClaim);
+            }
+        }
+
+        boolean removed = false;
+        for (final ContentClaim contentClaim : contentClaims) {
+            if (remove(contentClaim)) {
+                removed = true;
+            }
+        }
+
+        return removed;
+    }
+
     @Override
     public ContentClaim clone(final ContentClaim original, final boolean 
lossTolerant) throws IOException {
         final ContentClaim createdClaim = create(lossTolerant);
@@ -435,7 +462,7 @@ public class VolatileContentRepository implements 
ContentRepository {
     @Override
     public void purge() {
         for (final ContentClaim claim : claimMap.keySet()) {
-            claimManager.decrementClaimantCount(resolveClaim(claim));
+            
claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim());
             final ContentClaim backup = getBackupClaim(claim);
             if (backup != null) {
                 getBackupRepository().remove(backup);
@@ -624,7 +651,7 @@ public class VolatileContentRepository implements 
ContentRepository {
 
         @Override
         public void run() {
-            final List<ContentClaim> destructable = new ArrayList<>(1000);
+            final List<ResourceClaim> destructable = new ArrayList<>(1000);
             while (true) {
                 destructable.clear();
                 claimManager.drainDestructableClaims(destructable, 1000, 5, 
TimeUnit.SECONDS);
@@ -632,7 +659,7 @@ public class VolatileContentRepository implements 
ContentRepository {
                     return;
                 }
 
-                for (final ContentClaim claim : destructable) {
+                for (final ResourceClaim claim : destructable) {
                     remove(claim);
                 }
             }

Reply via email to