http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 1171636..0a9acc4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -23,14 +23,11 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; -import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; @@ -61,18 +58,21 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; -import org.apache.nifi.controller.repository.io.SyncOnCloseOutputStream; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.engine.FlowEngine; -import org.apache.nifi.util.file.FileUtils; +import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.LongHolder; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.StopWatch; - -import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.util.file.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,8 +92,11 @@ public class FileSystemRepository implements ContentRepository { private final AtomicLong index; private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true); - private final ConcurrentMap<String, BlockingQueue<ContentClaim>> reclaimable = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> reclaimable = new ConcurrentHashMap<>(); private final Map<String, ContainerState> containerStateMap = new HashMap<>(); + private final long maxAppendClaimLength = 1024L * 1024L; // 1 MB + private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100); + private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100); private final boolean archiveData; private final long maxArchiveMillis; @@ -101,7 +104,7 @@ public class FileSystemRepository implements ContentRepository { private final boolean alwaysSync; private final ScheduledExecutorService containerCleanupExecutor; - private ContentClaimManager contentClaimManager; // effectively final + private ResourceClaimManager resourceClaimManager; // effectively final // Map of contianer to archived files that should be deleted next. private final Map<String, BlockingQueue<ArchiveInfo>> archivedFiles = new HashMap<>(); @@ -113,7 +116,7 @@ public class FileSystemRepository implements ContentRepository { final NiFiProperties properties = NiFiProperties.getInstance(); // determine the file repository paths and ensure they exist final Map<String, Path> fileRespositoryPaths = properties.getContentRepositoryPaths(); - for (Path path : fileRespositoryPaths.values()) { + for (final Path path : fileRespositoryPaths.values()) { Files.createDirectories(path); } @@ -122,7 +125,7 @@ public class FileSystemRepository implements ContentRepository { index = new AtomicLong(0L); for (final String containerName : containerNames) { - reclaimable.put(containerName, new LinkedBlockingQueue<ContentClaim>(10000)); + reclaimable.put(containerName, new LinkedBlockingQueue<ResourceClaim>(10000)); archivedFiles.put(containerName, new LinkedBlockingQueue<ArchiveInfo>(100000)); } @@ -196,8 +199,8 @@ public class FileSystemRepository implements ContentRepository { } @Override - public void initialize(final ContentClaimManager claimManager) { - this.contentClaimManager = claimManager; + public void initialize(final ResourceClaimManager claimManager) { + this.resourceClaimManager = claimManager; final NiFiProperties properties = NiFiProperties.getInstance(); @@ -231,6 +234,13 @@ public class FileSystemRepository implements ContentRepository { public void shutdown() { executor.shutdown(); containerCleanupExecutor.shutdown(); + + for (final OutputStream out : writableClaimStreams.values()) { + try { + out.close(); + } catch (final IOException ioe) { + } + } } private static double getRatio(final String value) { @@ -397,8 +407,8 @@ public class FileSystemRepository implements ContentRepository { final String id = idPath.toFile().getName(); final String sectionName = sectionPath.toFile().getName(); - final ContentClaim contentClaim = contentClaimManager.newContentClaim(containerName, sectionName, id, false); - if (contentClaimManager.getClaimantCount(contentClaim) == 0) { + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false); + if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) { removeIncompleteContent(fileToRemove); } } @@ -427,15 +437,21 @@ public class FileSystemRepository implements ContentRepository { } private Path getPath(final ContentClaim claim) { - final Path containerPath = containers.get(claim.getContainer()); + final ResourceClaim resourceClaim = claim.getResourceClaim(); + return getPath(resourceClaim); + } + + private Path getPath(final ResourceClaim resourceClaim) { + final Path containerPath = containers.get(resourceClaim.getContainer()); if (containerPath == null) { return null; } - return containerPath.resolve(claim.getSection()).resolve(claim.getId()); + return containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId()); } private Path getPath(final ContentClaim claim, final boolean verifyExists) throws ContentNotFoundException { - final Path containerPath = containers.get(claim.getContainer()); + final ResourceClaim resourceClaim = claim.getResourceClaim(); + final Path containerPath = containers.get(resourceClaim.getContainer()); if (containerPath == null) { if (verifyExists) { throw new ContentNotFoundException(claim); @@ -445,11 +461,11 @@ public class FileSystemRepository implements ContentRepository { } // Create the Path that points to the data - Path resolvedPath = containerPath.resolve(claim.getSection()).resolve(String.valueOf(claim.getId())); + Path resolvedPath = containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId()); // If the data does not exist, create a Path that points to where the data would exist in the archive directory. if (!Files.exists(resolvedPath)) { - resolvedPath = getArchivePath(claim); + resolvedPath = getArchivePath(claim.getResourceClaim()); } if (verifyExists && !Files.exists(resolvedPath)) { @@ -460,34 +476,55 @@ public class FileSystemRepository implements ContentRepository { @Override public ContentClaim create(final boolean lossTolerant) throws IOException { - final long currentIndex = index.incrementAndGet(); - - String containerName = null; - boolean waitRequired = true; - ContainerState containerState = null; - for (long containerIndex = currentIndex; containerIndex < currentIndex + containers.size(); containerIndex++) { - final long modulatedContainerIndex = containerIndex % containers.size(); - containerName = containerNames.get((int) modulatedContainerIndex); - - containerState = containerStateMap.get(containerName); - if (!containerState.isWaitRequired()) { - waitRequired = false; - break; - } - } + ResourceClaim resourceClaim; + + // We need to synchronize on this queue because the act of pulling something off + // the queue and incrementing the associated claimant count MUST be done atomically. + // This way, if the claimant count is decremented to 0, we can ensure that the + // claim is not then pulled from the queue and used as another thread is destroying/archiving + // the claim. + final long resourceOffset; + synchronized (writableClaimQueue) { + final ClaimLengthPair pair = writableClaimQueue.poll(); + if (pair == null) { + final long currentIndex = index.incrementAndGet(); + + String containerName = null; + boolean waitRequired = true; + ContainerState containerState = null; + for (long containerIndex = currentIndex; containerIndex < currentIndex + containers.size(); containerIndex++) { + final long modulatedContainerIndex = containerIndex % containers.size(); + containerName = containerNames.get((int) modulatedContainerIndex); + + containerState = containerStateMap.get(containerName); + if (!containerState.isWaitRequired()) { + waitRequired = false; + break; + } + } - if (waitRequired) { - containerState.waitForArchiveExpiration(); - } + if (waitRequired) { + containerState.waitForArchiveExpiration(); + } - final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER; - final String section = String.valueOf(modulatedSectionIndex); - final String claimId = System.currentTimeMillis() + "-" + currentIndex; + final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER; + final String section = String.valueOf(modulatedSectionIndex); + final String claimId = System.currentTimeMillis() + "-" + currentIndex; - final ContentClaim claim = contentClaimManager.newContentClaim(containerName, section, claimId, lossTolerant); - contentClaimManager.incrementClaimantCount(claim, true); + resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant); + resourceOffset = 0L; + LOG.debug("Creating new Resource Claim {}", resourceClaim); + } else { + resourceClaim = pair.getClaim(); + resourceOffset = pair.getLength(); + LOG.debug("Reusing Resource Claim {}", resourceClaim); + } + + resourceClaimManager.incrementClaimantCount(resourceClaim, true); + } - return claim; + final StandardContentClaim scc = new StandardContentClaim(resourceClaim, resourceOffset); + return scc; } @Override @@ -496,7 +533,7 @@ public class FileSystemRepository implements ContentRepository { return 0; } - return contentClaimManager.incrementClaimantCount(claim); + return resourceClaimManager.incrementClaimantCount(claim.getResourceClaim()); } @Override @@ -504,7 +541,7 @@ public class FileSystemRepository implements ContentRepository { if (claim == null) { return 0; } - return contentClaimManager.getClaimantCount(claim); + return resourceClaimManager.getClaimantCount(claim.getResourceClaim()); } @Override @@ -513,7 +550,7 @@ public class FileSystemRepository implements ContentRepository { return 0; } - final int claimantCount = contentClaimManager.decrementClaimantCount(claim); + final int claimantCount = resourceClaimManager.decrementClaimantCount(claim.getResourceClaim()); return claimantCount; } @@ -523,9 +560,30 @@ public class FileSystemRepository implements ContentRepository { return false; } + return remove(claim.getResourceClaim()); + } + + private boolean remove(final ResourceClaim claim) { + if (claim == null) { + return false; + } + + // we synchronize on the queue here because if the claimant count is 0, + // we need to be able to remove any instance of that resource claim from the + // queue atomically (i.e., the checking of the claimant count plus removal from the queue + // must be atomic) + synchronized (writableClaimQueue) { + final int claimantCount = resourceClaimManager.getClaimantCount(claim); + if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) { + // if other content claims are claiming the same resource, we have nothing to destroy, + // so just consider the destruction successful. + return true; + } + } + Path path = null; try { - path = getPath(claim, false); + path = getPath(claim); } catch (final ContentNotFoundException cnfe) { } @@ -538,6 +596,7 @@ public class FileSystemRepository implements ContentRepository { return true; } + @Override public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException { if (original == null) { @@ -545,14 +604,11 @@ public class FileSystemRepository implements ContentRepository { } final ContentClaim newClaim = create(lossTolerant); - final Path currPath = getPath(original, true); - final Path newPath = getPath(newClaim); - try (final FileOutputStream fos = new FileOutputStream(newPath.toFile())) { - Files.copy(currPath, fos); - if (alwaysSync) { - fos.getFD().sync(); - } + try (final InputStream in = read(original); + final OutputStream out = write(newClaim)) { + StreamUtils.copy(in, out); } catch (final IOException ioe) { + decrementClaimantCount(newClaim); remove(newClaim); throw ioe; } @@ -564,44 +620,28 @@ public class FileSystemRepository implements ContentRepository { if (claims.contains(destination)) { throw new IllegalArgumentException("destination cannot be within claims"); } - try (final FileChannel dest = FileChannel.open(getPath(destination), StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { - long position = 0L; - if (header != null && header.length > 0) { - final ByteBuffer buffer = ByteBuffer.wrap(header); - while (buffer.hasRemaining()) { - position += dest.write(buffer, position); - } + + try (final ByteCountingOutputStream out = new ByteCountingOutputStream(write(destination))) { + if (header != null) { + out.write(header); } - int objectIndex = 0; + + int i = 0; for (final ContentClaim claim : claims) { - long totalCopied = 0L; - try (final FileChannel src = FileChannel.open(getPath(claim, true), StandardOpenOption.READ)) { - while (totalCopied < src.size()) { - final long copiedThisIteration = dest.transferFrom(src, position, Long.MAX_VALUE); - totalCopied += copiedThisIteration; - position += copiedThisIteration; - } + try (final InputStream in = read(claim)) { + StreamUtils.copy(in, out); } - // don't add demarcator after the last claim - if (demarcator != null && demarcator.length > 0 && (++objectIndex < claims.size())) { - final ByteBuffer buffer = ByteBuffer.wrap(demarcator); - while (buffer.hasRemaining()) { - position += dest.write(buffer, position); - } - } - } - if (footer != null && footer.length > 0) { - final ByteBuffer buffer = ByteBuffer.wrap(footer); - while (buffer.hasRemaining()) { - position += dest.write(buffer, position); + + if (++i < claims.size() && demarcator != null) { + out.write(demarcator); } } - if (alwaysSync) { - dest.force(true); + if (footer != null) { + out.write(footer); } - return position; + return out.getBytesWritten(); } } @@ -624,12 +664,8 @@ public class FileSystemRepository implements ContentRepository { @Override public long importFrom(final InputStream content, final ContentClaim claim, final boolean append) throws IOException { - try (final FileOutputStream out = new FileOutputStream(getPath(claim).toFile(), append)) { - final long copied = StreamUtils.copy(content, out); - if (alwaysSync) { - out.getFD().sync(); - } - return copied; + try (final OutputStream out = write(claim, append)) { + return StreamUtils.copy(content, out); } } @@ -642,20 +678,14 @@ public class FileSystemRepository implements ContentRepository { Files.createFile(destination); return 0L; } - if (append) { - try (final FileChannel sourceChannel = FileChannel.open(getPath(claim, true), StandardOpenOption.READ); - final FileChannel destinationChannel = FileChannel.open(destination, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) { - long position = destinationChannel.size(); - final long targetSize = position + sourceChannel.size(); - while (position < targetSize) { - final long bytesCopied = destinationChannel.transferFrom(sourceChannel, position, Long.MAX_VALUE); - position += bytesCopied; - } - return position; + + try (final InputStream in = read(claim); + final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) { + final long copied = StreamUtils.copy(in, fos); + if (alwaysSync) { + fos.getFD().sync(); } - } else { - Files.copy(getPath(claim, true), destination, StandardCopyOption.REPLACE_EXISTING); - return Files.size(destination); + return copied; } } @@ -674,28 +704,20 @@ public class FileSystemRepository implements ContentRepository { final long claimSize = size(claim); if (offset > claimSize) { - throw new IllegalArgumentException("offset of " + offset + " exceeds claim size of " + claimSize); + throw new IllegalArgumentException("Offset of " + offset + " exceeds claim size of " + claimSize); } - if (append) { - try (final InputStream sourceStream = Files.newInputStream(getPath(claim, true), StandardOpenOption.READ); - final OutputStream destinationStream = Files.newOutputStream(destination, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) { - StreamUtils.skip(sourceStream, offset); - - final byte[] buffer = new byte[8192]; - int len; - long copied = 0L; - while ((len = sourceStream.read(buffer, 0, (int) Math.min(length - copied, buffer.length))) > 0) { - destinationStream.write(buffer, 0, len); - copied += len; - } - return copied; + try (final InputStream in = read(claim); + final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) { + if (offset > 0) { + StreamUtils.skip(in, offset); } - } else { - try (final OutputStream out = Files.newOutputStream(destination, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { - return exportTo(claim, out, offset, length); + StreamUtils.copy(in, fos, length); + if (alwaysSync) { + fos.getFD().sync(); } + return length; } } @@ -704,7 +726,10 @@ public class FileSystemRepository implements ContentRepository { if (claim == null) { return 0L; } - return Files.copy(getPath(claim, true), destination); + + try (final InputStream in = read(claim)) { + return StreamUtils.copy(in, destination); + } } @Override @@ -719,7 +744,7 @@ public class FileSystemRepository implements ContentRepository { if (offset == 0 && length == claimSize) { return exportTo(claim, destination); } - try (final InputStream in = Files.newInputStream(getPath(claim, true))) { + try (final InputStream in = read(claim)) { StreamUtils.skip(in, offset); final byte[] buffer = new byte[8192]; int len; @@ -738,7 +763,12 @@ public class FileSystemRepository implements ContentRepository { return 0L; } - return Files.size(getPath(claim, true)); + // see javadocs for claim.getLength() as to why we do this. + if (claim.getLength() < 0) { + return Files.size(getPath(claim, true)) - claim.getOffset(); + } + + return claim.getLength(); } @Override @@ -747,16 +777,194 @@ public class FileSystemRepository implements ContentRepository { return new ByteArrayInputStream(new byte[0]); } final Path path = getPath(claim, true); - return new FileInputStream(path.toFile()); + final FileInputStream fis = new FileInputStream(path.toFile()); + if (claim.getOffset() > 0L) { + StreamUtils.skip(fis, claim.getOffset()); + } + + // see javadocs for claim.getLength() as to why we do this. + if (claim.getLength() >= 0) { + return new LimitedInputStream(fis, claim.getLength()); + } else { + return fis; + } } @Override - @SuppressWarnings("resource") public OutputStream write(final ContentClaim claim) throws IOException { - final FileOutputStream fos = new FileOutputStream(getPath(claim).toFile()); - return alwaysSync ? new SyncOnCloseOutputStream(fos) : fos; + return write(claim, false); } + private OutputStream write(final ContentClaim claim, final boolean append) throws IOException { + if (claim == null) { + throw new NullPointerException("ContentClaim cannot be null"); + } + + if (!(claim instanceof StandardContentClaim)) { + // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything + // else, just throw an Exception because it is not valid for this Repository + throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Content Repository"); + } + + final StandardContentClaim scc = (StandardContentClaim) claim; + if (claim.getLength() > 0) { + throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to."); + } + + // we always append because there may be another ContentClaim using the same resource claim. + // However, we know that we will never write to the same claim from two different threads + // at the same time because we will call create() to get the claim before we write to it, + // and when we call create(), it will remove it from the Queue, which means that no other + // thread will get the same Claim until we've finished writing to it. + ByteCountingOutputStream claimStream = writableClaimStreams.remove(scc.getResourceClaim()); + final long initialLength; + if (claimStream == null) { + final File file = getPath(scc).toFile(); + // use a synchronized stream because we want to pass this OutputStream out from one thread to another. + claimStream = new SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), file.length()); + initialLength = 0L; + } else { + if (append) { + initialLength = Math.max(0, scc.getLength()); + } else { + initialLength = 0; + } + } + + final ByteCountingOutputStream bcos = claimStream; + final OutputStream out = new OutputStream() { + private long bytesWritten = 0L; + private boolean recycle = true; + private boolean closed = false; + + @Override + public String toString() { + return "FileSystemRepository Stream [" + scc + "]"; + } + + @Override + public synchronized void write(final int b) throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + + try { + bcos.write(b); + } catch (final IOException ioe) { + recycle = false; + throw new IOException("Failed to write to " + this, ioe); + } + + bytesWritten++; + scc.setLength(bytesWritten + initialLength); + } + + @Override + public synchronized void write(final byte[] b) throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + + try { + bcos.write(b); + } catch (final IOException ioe) { + recycle = false; + throw new IOException("Failed to write to " + this, ioe); + } + + bytesWritten += b.length; + scc.setLength(bytesWritten + initialLength); + } + + @Override + public synchronized void write(final byte[] b, final int off, final int len) throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + + try { + bcos.write(b, off, len); + } catch (final IOException ioe) { + recycle = false; + throw new IOException("Failed to write to " + this, ioe); + } + + bytesWritten += len; + scc.setLength(bytesWritten + initialLength); + } + + @Override + public synchronized void flush() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + + bcos.flush(); + } + + @Override + public synchronized void close() throws IOException { + closed = true; + + if (alwaysSync) { + ((FileOutputStream) bcos.getWrappedStream()).getFD().sync(); + } + + if (scc.getLength() < 0) { + // If claim was not written to, set length to 0 + scc.setLength(0L); + } + + // if we've not yet hit the threshold for appending to a resource claim, add the claim + // to the writableClaimQueue so that the Resource Claim can be used again when create() + // is called. In this case, we don't have to actually close the file stream. Instead, we + // can just add it onto the queue and continue to use it for the next content claim. + final long resourceClaimLength = scc.getOffset() + scc.getLength(); + if (recycle && resourceClaimLength < maxAppendClaimLength) { + // we do not have to synchronize on the writable claim queue here because we + // are only adding something to the queue. We must synchronize if we are + // using a ResourceClaim from the queue and incrementing the claimant count on that resource + // because those need to be done atomically, or if we are destroying a claim that is on + // the queue because we need to ensure that the latter operation does not cause problems + // with the former. + final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength); + final boolean enqueued = writableClaimQueue.offer(pair); + + if (enqueued) { + writableClaimStreams.put(scc.getResourceClaim(), bcos); + LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this); + } else { + bcos.close(); + + LOG.debug("Claim length less than max; Closing {} because could not add back to queue", this); + if (LOG.isTraceEnabled()) { + LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this)); + } + } + } else { + // we've reached the limit for this claim. Don't add it back to our queue. + // Instead, just remove it and move on. + + // ensure that the claim is no longer on the queue + writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength)); + bcos.close(); + LOG.debug("Claim lenth >= max; Closing {}", this); + if (LOG.isTraceEnabled()) { + LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this)); + } + } + } + }; + + LOG.debug("Writing to {}", out); + if (LOG.isTraceEnabled()) { + LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for writing to " + out)); + } + + return out; + } + + @Override public void purge() { // delete all content from repositories @@ -788,7 +996,7 @@ public class FileSystemRepository implements ContentRepository { } } - contentClaimManager.purge(); + resourceClaimManager.purge(); } private class BinDestructableClaims implements Runnable { @@ -800,17 +1008,17 @@ public class FileSystemRepository implements ContentRepository { // because the Container generally maps to a physical partition on the disk, so we want a few // different threads hitting the different partitions but don't want multiple threads hitting // the same partition. - final List<ContentClaim> toDestroy = new ArrayList<>(); + final List<ResourceClaim> toDestroy = new ArrayList<>(); while (true) { toDestroy.clear(); - contentClaimManager.drainDestructableClaims(toDestroy, 10000); + resourceClaimManager.drainDestructableClaims(toDestroy, 10000); if (toDestroy.isEmpty()) { return; } - for (final ContentClaim claim : toDestroy) { + for (final ResourceClaim claim : toDestroy) { final String container = claim.getContainer(); - final BlockingQueue<ContentClaim> claimQueue = reclaimable.get(container); + final BlockingQueue<ResourceClaim> claimQueue = reclaimable.get(container); try { while (true) { @@ -838,7 +1046,7 @@ public class FileSystemRepository implements ContentRepository { return sectionPath.resolve(ARCHIVE_DIR_NAME).resolve(claimId); } - private Path getArchivePath(final ContentClaim claim) { + private Path getArchivePath(final ResourceClaim claim) { final String claimId = claim.getId(); final Path containerPath = containers.get(claim.getContainer()); final Path archivePath = containerPath.resolve(claim.getSection()).resolve(ARCHIVE_DIR_NAME).resolve(claimId); @@ -859,22 +1067,28 @@ public class FileSystemRepository implements ContentRepository { return true; } - return Files.exists(getArchivePath(contentClaim)); + return Files.exists(getArchivePath(contentClaim.getResourceClaim())); } - private void archive(final ContentClaim contentClaim) throws IOException { + private void archive(final ResourceClaim claim) throws IOException { if (!archiveData) { return; } - final int claimantCount = getClaimantCount(contentClaim); - if (claimantCount > 0) { - throw new IllegalStateException("Cannot archive ContentClaim " + contentClaim + " because it is currently in use"); + synchronized (writableClaimQueue) { + final int claimantCount = claim == null ? 0 : resourceClaimManager.getClaimantCount(claim); + if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) { + return; + } + } + + final Path curPath = getPath(claim); + if (curPath == null) { + return; } - final Path curPath = getPath(contentClaim, true); archive(curPath); - LOG.debug("Successfully moved {} to archive", contentClaim); + LOG.debug("Successfully moved {} to archive", claim); } private void archive(final Path curPath) throws IOException { @@ -1107,8 +1321,8 @@ public class FileSystemRepository implements ContentRepository { while (true) { // look through each of the binned queues of Content Claims int successCount = 0; - final List<ContentClaim> toRemove = new ArrayList<>(); - for (final Map.Entry<String, BlockingQueue<ContentClaim>> entry : reclaimable.entrySet()) { + final List<ResourceClaim> toRemove = new ArrayList<>(); + for (final Map.Entry<String, BlockingQueue<ResourceClaim>> entry : reclaimable.entrySet()) { // drain the queue of all ContentClaims that can be destroyed for the given container. final String container = entry.getKey(); final ContainerState containerState = containerStateMap.get(container); @@ -1121,7 +1335,7 @@ public class FileSystemRepository implements ContentRepository { // destroy each claim for this container final long start = System.nanoTime(); - for (final ContentClaim claim : toRemove) { + for (final ResourceClaim claim : toRemove) { if (archiveData) { try { archive(claim); @@ -1210,7 +1424,7 @@ public class FileSystemRepository implements ContentRepository { @Override public void run() { try { - if (oldestArchiveDate.get() > (System.currentTimeMillis() - maxArchiveMillis)) { + if (oldestArchiveDate.get() > System.currentTimeMillis() - maxArchiveMillis) { final Long minRequiredSpace = minUsableContainerBytesForArchive.get(containerName); if (minRequiredSpace == null) { return; @@ -1245,7 +1459,7 @@ public class FileSystemRepository implements ContentRepository { if (oldestContainerArchive < 0L) { boolean updated; do { - long oldest = oldestArchiveDate.get(); + final long oldest = oldestArchiveDate.get(); if (oldestContainerArchive < oldest) { updated = oldestArchiveDate.compareAndSet(oldest, oldestContainerArchive); @@ -1298,7 +1512,7 @@ public class FileSystemRepository implements ContentRepository { final long free = getContainerUsableSpace(containerName); used = capacity - free; bytesUsed = used; - } catch (IOException e) { + } catch (final IOException e) { return false; } } @@ -1317,7 +1531,7 @@ public class FileSystemRepository implements ContentRepository { try { LOG.info("Unable to write to container {} due to archive file size constraints; waiting for archive cleanup", containerName); condition.await(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { } } } finally { @@ -1355,4 +1569,55 @@ public class FileSystemRepository implements ContentRepository { } } + + private static class ClaimLengthPair { + private final ResourceClaim claim; + private final Long length; + + public ClaimLengthPair(final ResourceClaim claim, final Long length) { + this.claim = claim; + this.length = length; + } + + public ResourceClaim getClaim() { + return claim; + } + + public Long getLength() { + return length; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (claim == null ? 0 : claim.hashCode()); + return result; + } + + /** + * Equality is determined purely by the ResourceClaim's equality + * + * @param obj the object to compare against + * @return -1, 0, or +1 according to the contract of Object.equals + */ + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + final ClaimLengthPair other = (ClaimLengthPair) obj; + return claim.equals(other.getClaim()); + } + } + }
http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java index 6524cd3..cc8c734 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java @@ -95,7 +95,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { @Override public boolean isPenalized() { - return (penaltyExpirationMs > 0) ? penaltyExpirationMs > System.currentTimeMillis() : false; + return penaltyExpirationMs > 0 ? penaltyExpirationMs > System.currentTimeMillis() : false; } @Override @@ -150,7 +150,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { public String toString() { final ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE); builder.append("uuid", getAttribute(CoreAttributes.UUID.key())); - builder.append("claim", claim == null ? "" : claim.getId()); + builder.append("claim", claim == null ? "" : claim.toString()); builder.append("offset", claimOffset); builder.append("name", getAttribute(CoreAttributes.FILENAME.key())).append("size", size); return builder.toString(); @@ -169,7 +169,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { private final Set<String> bLineageIdentifiers = new HashSet<>(); private long bPenaltyExpirationMs = -1L; private long bSize = 0L; - private Map<String, String> bAttributes = new HashMap<>(); + private final Map<String, String> bAttributes = new HashMap<>(); private ContentClaim bClaim = null; private long bClaimOffset = 0L; private long bLastQueueDate = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 210b620..0e08325 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -44,6 +44,7 @@ import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.FlowFileQueue; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.io.ByteCountingInputStream; import org.apache.nifi.controller.repository.io.ByteCountingOutputStream; import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream; @@ -54,10 +55,6 @@ import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.controller.repository.io.LongHolder; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.stream.io.NonCloseableInputStream; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.QueueSize; @@ -75,7 +72,8 @@ import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.provenance.StandardProvenanceEventRecord; -import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.StreamUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,10 +92,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback public static final int VERBOSE_LOG_THRESHOLD = 10; - private static final long MAX_APPENDABLE_CLAIM_SIZE = DataUnit.parseDataSize( - NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue(); - private static final int MAX_FLOWFILES_PER_CLAIM = NiFiProperties.getInstance().getMaxFlowFilesPerClaim(); - public static final String DEFAULT_FLOWFILE_PATH = "./"; private static final Logger LOG = LoggerFactory.getLogger(StandardProcessSession.class); @@ -126,12 +120,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final LongHolder bytesWritten = new LongHolder(0L); private int flowFilesIn = 0, flowFilesOut = 0; private long contentSizeIn = 0L, contentSizeOut = 0L; - private int writeRecursionLevel = 0; - - private ContentClaim currentWriteClaim = null; - private OutputStream currentWriteClaimStream = null; - private long currentWriteClaimSize = 0L; - private int currentWriteClaimFlowFileCount = 0; private ContentClaim currentReadClaim = null; private ByteCountingInputStream currentReadClaimStream = null; @@ -706,12 +694,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (originalClaim == null) { builder.setCurrentContentClaim(null, null, null, null, 0L); } else { + final ResourceClaim resourceClaim = originalClaim.getResourceClaim(); builder.setCurrentContentClaim( - originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() - ); + resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), + repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset(), repoRecord.getOriginal().getSize()); builder.setPreviousContentClaim( - originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() - ); + resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), + repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset(), repoRecord.getOriginal().getSize()); } } @@ -727,14 +716,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final ContentClaim currentClaim = repoRecord.getCurrentClaim(); final long currentOffset = repoRecord.getCurrentClaimOffset(); final long size = flowFile.getSize(); - recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size); + + final ResourceClaim resourceClaim = currentClaim.getResourceClaim(); + recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size); } if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) { final ContentClaim originalClaim = repoRecord.getOriginalClaim(); final long originalOffset = repoRecord.getOriginal().getContentClaimOffset(); final long originalSize = repoRecord.getOriginal().getSize(); - recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize); + + final ResourceClaim resourceClaim = originalClaim.getResourceClaim(); + recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), originalOffset + originalClaim.getOffset(), originalSize); } final FlowFileQueue originalQueue = repoRecord.getOriginalQueue(); @@ -757,14 +750,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final ContentClaim currentClaim = repoRecord.getCurrentClaim(); final long currentOffset = repoRecord.getCurrentClaimOffset(); final long size = eventFlowFile.getSize(); - recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size); + + final ResourceClaim resourceClaim = currentClaim.getResourceClaim(); + recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), currentOffset + currentClaim.getOffset(), size); } if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) { final ContentClaim originalClaim = repoRecord.getOriginalClaim(); final long originalOffset = repoRecord.getOriginal().getContentClaimOffset(); final long originalSize = repoRecord.getOriginal().getSize(); - recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize); + + final ResourceClaim resourceClaim = originalClaim.getResourceClaim(); + recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), originalOffset + originalClaim.getOffset(), originalSize); } final FlowFileQueue originalQueue = repoRecord.getOriginalQueue(); @@ -1670,8 +1667,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final ContentClaim claim = record.getContentClaim(); if (claim != null) { - enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize()); - enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize()); + final ResourceClaim resourceClaim = claim.getResourceClaim(); + enriched.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), + record.getContentClaimOffset() + claim.getOffset(), record.getSize()); + enriched.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), + record.getContentClaimOffset() + claim.getOffset(), record.getSize()); } enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap()); @@ -1715,7 +1715,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE StreamUtils.skip(currentReadClaimStream, bytesToSkip); } - return new NonCloseableInputStream(currentReadClaimStream); + return new DisableOnCloseInputStream(currentReadClaimStream); } } @@ -1731,7 +1731,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can // reuse the same InputStream for the next FlowFile - return new NonCloseableInputStream(currentReadClaimStream); + return new DisableOnCloseInputStream(currentReadClaimStream); } else { final InputStream rawInStream = context.getContentRepository().read(claim); StreamUtils.skip(rawInStream, offset); @@ -1882,30 +1882,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return newFile; } - private void enforceCurrentWriteClaimState() { - if (currentWriteClaimFlowFileCount > MAX_FLOWFILES_PER_CLAIM || currentWriteClaimSize > MAX_APPENDABLE_CLAIM_SIZE) { - resetWriteClaims(); - } - - if (currentWriteClaimStream == null) { - try { - currentWriteClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); - claimLog.debug("Creating ContentClaim {} to enforce Current Write Claim State for {}", currentWriteClaim, context.getConnectable()); - } catch (final IOException e) { - throw new FlowFileHandlingException("Unable to create ContentClaim due to " + e.toString(), e); - } - - try { - currentWriteClaimStream = context.getContentRepository().write(currentWriteClaim); - } catch (final IOException e) { - resetWriteClaims(); - throw new FlowFileAccessException("Unable to obtain stream for writing to Content Repostiory: " + e, e); - } - } else { - context.getContentRepository().incrementClaimaintCount(currentWriteClaim); - } - } - private void ensureNotAppending(final ContentClaim claim) throws IOException { if (claim == null) { return; @@ -1924,79 +1900,36 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE public FlowFile write(final FlowFile source, final OutputStreamCallback writer) { validateRecordState(source); final StandardRepositoryRecord record = records.get(source); - long newSize = 0L; - long claimOffset = 0L; ContentClaim newClaim = null; final LongHolder writtenHolder = new LongHolder(0L); - final boolean appendToClaim = isMergeContent(); try { - if (appendToClaim) { - enforceCurrentWriteClaimState(); - claimOffset = currentWriteClaimSize; - newClaim = currentWriteClaim; - ensureNotAppending(newClaim); - - try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(currentWriteClaimStream); - final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) { - - recursionSet.add(source); - - writeRecursionLevel++; - try { - writer.process(new FlowFileAccessOutputStream(countingOut, source)); - } finally { - writeRecursionLevel--; - } - } finally { - recursionSet.remove(source); - } - - final long writtenThisCall = writtenHolder.getValue(); - newSize = writtenThisCall; - currentWriteClaimSize += newSize; - } else { - newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); - claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source); + newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); + claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source); - ensureNotAppending(newClaim); - try (final OutputStream stream = context.getContentRepository().write(newClaim); - final OutputStream countingOut = new ByteCountingOutputStream(stream, writtenHolder)) { - recursionSet.add(source); - - writeRecursionLevel++; - try { - writer.process(new FlowFileAccessOutputStream(countingOut, source)); - } finally { - writeRecursionLevel--; - } - } finally { - recursionSet.remove(source); - } - newSize = context.getContentRepository().size(newClaim); + ensureNotAppending(newClaim); + try (final OutputStream stream = context.getContentRepository().write(newClaim); + final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream); + final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) { + recursionSet.add(source); + writer.process(new FlowFileAccessOutputStream(countingOut, source)); + } finally { + recursionSet.remove(source); } } catch (final ContentNotFoundException nfe) { - if (appendToClaim) { - resetWriteClaims(); // need to reset write claim before we can remove the claim - } + resetWriteClaims(); // need to reset write claim before we can remove the claim destroyContent(newClaim); handleContentNotFound(nfe, record); } catch (final FlowFileAccessException ffae) { - if (appendToClaim) { - resetWriteClaims(); // need to reset write claim before we can remove the claim - } + resetWriteClaims(); // need to reset write claim before we can remove the claim destroyContent(newClaim); throw ffae; } catch (final IOException ioe) { - if (appendToClaim) { - resetWriteClaims(); // need to reset write claim before we can remove the claim - } + resetWriteClaims(); // need to reset write claim before we can remove the claim destroyContent(newClaim); throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe); } catch (final Throwable t) { - if (appendToClaim) { - resetWriteClaims(); // need to reset write claim before we can remove the claim - } + resetWriteClaims(); // need to reset write claim before we can remove the claim destroyContent(newClaim); throw t; } finally { @@ -2004,7 +1937,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } removeTemporaryClaim(record); - final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build(); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder() + .fromFlowFile(record.getCurrent()) + .contentClaim(newClaim) + .contentClaimOffset(0) + .size(writtenHolder.getValue()) + .build(); + record.setWorking(newFile); return newFile; } @@ -2041,13 +1980,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // wrap our OutputStreams so that the processor cannot close it try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) { recursionSet.add(source); - - writeRecursionLevel++; - try { - writer.process(new FlowFileAccessOutputStream(disableOnClose, source)); - } finally { - writeRecursionLevel--; - } + writer.process(new FlowFileAccessOutputStream(disableOnClose, source)); } finally { recursionSet.remove(source); } @@ -2059,13 +1992,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // wrap our OutputStreams so that the processor cannot close it try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) { recursionSet.add(source); - - writeRecursionLevel++; - try { - writer.process(disableOnClose); - } finally { - writeRecursionLevel--; - } + writer.process(disableOnClose); } finally { recursionSet.remove(source); } @@ -2142,25 +2069,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } private void resetWriteClaims(final boolean suppressExceptions) { - try { - if (currentWriteClaimStream != null) { - try { - currentWriteClaimStream.flush(); - } finally { - currentWriteClaimStream.close(); - } - } - } catch (final IOException e) { - if (!suppressExceptions) { - throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository"); - } - } - - currentWriteClaimStream = null; - currentWriteClaim = null; - currentWriteClaimFlowFileCount = 0; - currentWriteClaimSize = 0L; - for (final ByteCountingOutputStream out : appendableStreams.values()) { try { try { @@ -2188,16 +2096,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE currentReadClaim = null; } - /** - * @return Indicates whether or not multiple FlowFiles should be merged into - * a single ContentClaim - */ - private boolean isMergeContent() { - if (writeRecursionLevel > 0) { - return false; - } - return true; - } @Override public FlowFile write(final FlowFile source, final StreamCallback writer) { @@ -2206,117 +2104,55 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final ContentClaim currClaim = record.getCurrentClaim(); ContentClaim newClaim = null; - long newSize = 0L; - long claimOffset = 0L; final LongHolder writtenHolder = new LongHolder(0L); - final boolean appendToClaim = isMergeContent(); try { - if (appendToClaim) { - enforceCurrentWriteClaimState(); - claimOffset = currentWriteClaimSize; - newClaim = currentWriteClaim; - ensureNotAppending(newClaim); - - try (final InputStream rawIn = getInputStream(source, currClaim, record.getCurrentClaimOffset()); - final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); - final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); - final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); - final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream); - final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) { - - recursionSet.add(source); - - // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from - // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository - // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any - // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it - // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it. - final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim); - boolean cnfeThrown = false; - - writeRecursionLevel++; - try { - writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source)); - } catch (final ContentNotFoundException cnfe) { - cnfeThrown = true; - throw cnfe; - } finally { - writeRecursionLevel--; - recursionSet.remove(source); - - // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate. - if (!cnfeThrown && ffais.getContentNotFoundException() != null) { - throw ffais.getContentNotFoundException(); - } - } - } + newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); + claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source); - final long writtenThisCall = writtenHolder.getValue(); - newSize = writtenThisCall; - currentWriteClaimSize += writtenThisCall; - } else { - newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); - claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source); + ensureNotAppending(newClaim); - ensureNotAppending(newClaim); + try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset()); + final InputStream limitedIn = new LimitedInputStream(is, source.getSize()); + final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); + final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); + final OutputStream os = context.getContentRepository().write(newClaim); + final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os); + final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) { - try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset()); - final InputStream limitedIn = new LimitedInputStream(is, source.getSize()); - final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead); - final OutputStream os = context.getContentRepository().write(newClaim); - final OutputStream countingOut = new ByteCountingOutputStream(os, writtenHolder)) { + recursionSet.add(source); - recursionSet.add(source); + // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from + // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository + // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any + // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it + // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it. + final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim); + boolean cnfeThrown = false; - // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from - // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository - // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any - // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it - // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it. - final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingIn, source, currClaim); - boolean cnfeThrown = false; - - writeRecursionLevel++; - try { - writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source)); - } catch (final ContentNotFoundException cnfe) { - cnfeThrown = true; - throw cnfe; - } finally { - writeRecursionLevel--; - recursionSet.remove(source); + try { + writer.process(ffais, new FlowFileAccessOutputStream(countingOut, source)); + } catch (final ContentNotFoundException cnfe) { + cnfeThrown = true; + throw cnfe; + } finally { + recursionSet.remove(source); - // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate. - if (!cnfeThrown && ffais.getContentNotFoundException() != null) { - throw ffais.getContentNotFoundException(); - } + // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate. + if (!cnfeThrown && ffais.getContentNotFoundException() != null) { + throw ffais.getContentNotFoundException(); } } - - newSize = context.getContentRepository().size(newClaim); } } catch (final ContentNotFoundException nfe) { - if (appendToClaim) { - resetWriteClaims(); // need to reset write claim before we can remove the claim - } destroyContent(newClaim); handleContentNotFound(nfe, record); } catch (final IOException ioe) { - if (appendToClaim) { - resetWriteClaims(); // need to reset write claim before we can remove the claim - } destroyContent(newClaim); throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe); } catch (final FlowFileAccessException ffae) { - if (appendToClaim) { - resetWriteClaims(); - } destroyContent(newClaim); throw ffae; } catch (final Throwable t) { - if (appendToClaim) { - resetWriteClaims(); // need to reset write claim before we can remove the claim - } destroyContent(newClaim); throw t; } finally { @@ -2324,7 +2160,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } removeTemporaryClaim(record); - final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build(); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder() + .fromFlowFile(record.getCurrent()) + .contentClaim(newClaim) + .contentClaimOffset(0L) + .size(writtenHolder.getValue()) + .build(); + record.setWorking(newFile); return newFile; } @@ -2343,33 +2185,20 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final ContentClaim newClaim; final long claimOffset; - final boolean appendToClaim = isMergeContent(); - if (appendToClaim) { - enforceCurrentWriteClaimState(); - newClaim = currentWriteClaim; - claimOffset = currentWriteClaimSize; - } else { - try { - newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); - claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination); - } catch (final IOException e) { - throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e); - } - - claimOffset = 0L; + try { + newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); + claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination); + } catch (final IOException e) { + throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e); } + claimOffset = 0L; long newSize = 0L; try { - final boolean append = isMergeContent(); - newSize = context.getContentRepository().importFrom(source, newClaim, append); + newSize = context.getContentRepository().importFrom(source, newClaim); bytesWritten.increment(newSize); bytesRead.increment(newSize); - currentWriteClaimSize += newSize; } catch (final Throwable t) { - if (appendToClaim) { - resetWriteClaims(); - } destroyContent(newClaim); throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t); } @@ -2392,40 +2221,24 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE validateRecordState(destination); final StandardRepositoryRecord record = records.get(destination); ContentClaim newClaim = null; - long claimOffset = 0L; + final long claimOffset = 0L; final long newSize; - final boolean appendToClaim = isMergeContent(); try { - if (appendToClaim) { - enforceCurrentWriteClaimState(); - newClaim = currentWriteClaim; - claimOffset = currentWriteClaimSize; - - final long bytesCopied = StreamUtils.copy(source, currentWriteClaimStream); - bytesWritten.increment(bytesCopied); - currentWriteClaimSize += bytesCopied; - newSize = bytesCopied; - } else { - try { - newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); - claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination); + try { + newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); + claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination); - newSize = context.getContentRepository().importFrom(source, newClaim, appendToClaim); - bytesWritten.increment(newSize); - currentWriteClaimSize += newSize; - } catch (final IOException e) { - throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e); - } + newSize = context.getContentRepository().importFrom(source, newClaim); + bytesWritten.increment(newSize); + } catch (final IOException e) { + throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e); } } catch (final Throwable t) { - if (appendToClaim) { - resetWriteClaims(); - } - if (newClaim != null) { destroyContent(newClaim); } + throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t); } http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java index 3bfdd8a..6c1626c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java @@ -27,8 +27,10 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -38,7 +40,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.io.ArrayManagedOutputStream; import org.apache.nifi.controller.repository.io.MemoryManager; @@ -92,7 +95,7 @@ public class VolatileContentRepository implements ContentRepository { private final ConcurrentMap<ContentClaim, ContentClaim> backupRepoClaimMap = new ConcurrentHashMap<>(256); private final AtomicReference<ContentRepository> backupRepositoryRef = new AtomicReference<>(null); - private ContentClaimManager claimManager; // effectively final + private ResourceClaimManager claimManager; // effectively final public VolatileContentRepository() { this(NiFiProperties.getInstance()); @@ -119,7 +122,7 @@ public class VolatileContentRepository implements ContentRepository { } @Override - public void initialize(final ContentClaimManager claimManager) { + public void initialize(final ResourceClaimManager claimManager) { this.claimManager = claimManager; for (int i = 0; i < 3; i++) { @@ -199,9 +202,10 @@ public class VolatileContentRepository implements ContentRepository { private ContentClaim createLossTolerant() { final long id = idGenerator.getAndIncrement(); - final ContentClaim claim = claimManager.newContentClaim(CONTAINER_NAME, "section", String.valueOf(id), true); + final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true); + final ContentClaim claim = new StandardContentClaim(resourceClaim, 0L); final ContentBlock contentBlock = new ContentBlock(claim, repoSize); - claimManager.incrementClaimantCount(claim, true); + claimManager.incrementClaimantCount(resourceClaim, true); claimMap.put(claim, contentBlock); @@ -216,7 +220,7 @@ public class VolatileContentRepository implements ContentRepository { } final ContentClaim backupClaim = getBackupClaim(claim); if (backupClaim == null) { - return claimManager.incrementClaimantCount(resolveClaim(claim)); + return claimManager.incrementClaimantCount(resolveClaim(claim).getResourceClaim()); } else { return getBackupRepository().incrementClaimaintCount(backupClaim); } @@ -230,7 +234,7 @@ public class VolatileContentRepository implements ContentRepository { final ContentClaim backupClaim = getBackupClaim(claim); if (backupClaim == null) { - return claimManager.decrementClaimantCount(resolveClaim(claim)); + return claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim()); } else { return getBackupRepository().decrementClaimantCount(backupClaim); } @@ -244,7 +248,7 @@ public class VolatileContentRepository implements ContentRepository { final ContentClaim backupClaim = getBackupClaim(claim); if (backupClaim == null) { - return claimManager.getClaimantCount(resolveClaim(claim)); + return claimManager.getClaimantCount(resolveClaim(claim).getResourceClaim()); } else { return getBackupRepository().getClaimantCount(backupClaim); } @@ -273,6 +277,29 @@ public class VolatileContentRepository implements ContentRepository { return true; } + private boolean remove(final ResourceClaim claim) { + if (claim == null) { + return false; + } + + final Set<ContentClaim> contentClaims = new HashSet<>(); + for (final Map.Entry<ContentClaim, ContentBlock> entry : claimMap.entrySet()) { + final ContentClaim contentClaim = entry.getKey(); + if (contentClaim.getResourceClaim().equals(claim)) { + contentClaims.add(contentClaim); + } + } + + boolean removed = false; + for (final ContentClaim contentClaim : contentClaims) { + if (remove(contentClaim)) { + removed = true; + } + } + + return removed; + } + @Override public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException { final ContentClaim createdClaim = create(lossTolerant); @@ -435,7 +462,7 @@ public class VolatileContentRepository implements ContentRepository { @Override public void purge() { for (final ContentClaim claim : claimMap.keySet()) { - claimManager.decrementClaimantCount(resolveClaim(claim)); + claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim()); final ContentClaim backup = getBackupClaim(claim); if (backup != null) { getBackupRepository().remove(backup); @@ -624,7 +651,7 @@ public class VolatileContentRepository implements ContentRepository { @Override public void run() { - final List<ContentClaim> destructable = new ArrayList<>(1000); + final List<ResourceClaim> destructable = new ArrayList<>(1000); while (true) { destructable.clear(); claimManager.drainDestructableClaims(destructable, 1000, 5, TimeUnit.SECONDS); @@ -632,7 +659,7 @@ public class VolatileContentRepository implements ContentRepository { return; } - for (final ContentClaim claim : destructable) { + for (final ResourceClaim claim : destructable) { remove(claim); } }
