Repository: nifi Updated Branches: refs/heads/NIFI-744 b8cee5105 -> 53a6e962d
NIFI-744: Do not allow StandardContentClaim's offset to be updated. Data should be read-only once it has been written Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/53a6e962 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/53a6e962 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/53a6e962 Branch: refs/heads/NIFI-744 Commit: 53a6e962d6c8a29e80c61e9f75af20cae9abe25d Parents: b8cee51 Author: Mark Payne <[email protected]> Authored: Fri Jul 31 13:48:32 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Fri Jul 31 13:48:32 2015 -0400 ---------------------------------------------------------------------- .../repository/ContentRepository.java | 10 ++- .../repository/FileSystemRepository.java | 13 ++-- .../repository/StandardProcessSession.java | 27 ++++--- .../repository/claim/StandardContentClaim.java | 18 ++--- .../repository/TestFileSystemRepository.java | 76 -------------------- 5 files changed, 37 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java index da87d75..8d0bdb3 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java @@ -173,9 +173,13 @@ public interface ContentRepository { * @param content to import from * @param claim the claim to write imported content to * @param append if true, the content will be appended to the claim; if - * false, the content will replace the contents of the claim + * false, the content will replace the contents of the claim * @throws IOException if unable to read content + * + * @deprecated if needing to append to a content claim, the contents of the claim should be + * copied to a new claim and then the data to append should be written to that new claim. */ + @Deprecated long importFrom(Path content, ContentClaim claim, boolean append) throws IOException; /** @@ -198,7 +202,11 @@ public interface ContentRepository { * @param append whether to append or replace * @return length of data imported in bytes * @throws IOException if failure to read or write stream + * + * @deprecated if needing to append to a content claim, the contents of the claim should be + * copied to a new claim and then the data to append should be written to that new claim. */ + @Deprecated long importFrom(InputStream content, ContentClaim claim, boolean append) throws IOException; /** http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 3a6338c..ced198c 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -764,7 +764,7 @@ public class FileSystemRepository implements ContentRepository { // see javadocs for claim.getLength() as to why we do this. if (claim.getLength() < 0) { - return Files.size(getPath(claim, true)); + return Files.size(getPath(claim, true)) - claim.getOffset(); } return claim.getLength(); @@ -806,6 +806,9 @@ public class FileSystemRepository implements ContentRepository { } final StandardContentClaim scc = (StandardContentClaim) claim; + if (claim.getLength() > 0) { + throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to."); + } // we always append because there may be another ContentClaim using the same resource claim. // However, we know that we will never write to the same claim from two different threads @@ -823,7 +826,6 @@ public class FileSystemRepository implements ContentRepository { initialLength = Math.max(0, scc.getLength()); } else { initialLength = 0; - scc.setOffset(claimStream.getBytesWritten()); } } @@ -936,10 +938,11 @@ public class FileSystemRepository implements ContentRepository { writableClaimStreams.put(scc.getResourceClaim(), bcos); LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this); } else { - writableClaimStreams.remove(scc.getResourceClaim()); bcos.close(); - LOG.debug("Claim length less than max; Closing {}", this); + LOG.debug( + "Claim length less than max; Closing {} because could not add back to queue", + this); if (LOG.isTraceEnabled()) { LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this)); } @@ -947,7 +950,7 @@ public class FileSystemRepository implements ContentRepository { } else { // we've reached the limit for this claim. Don't add it back to our queue. // Instead, just remove it and move on. - writableClaimStreams.remove(scc.getResourceClaim()); + // ensure that the claim is no longer on the queue writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength)); bcos.close(); http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 3e2868e..62a001c 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1880,8 +1880,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE public FlowFile write(final FlowFile source, final OutputStreamCallback writer) { validateRecordState(source); final StandardRepositoryRecord record = records.get(source); - long newSize = 0L; - final long claimOffset = 0L; ContentClaim newClaim = null; final LongHolder writtenHolder = new LongHolder(0L); @@ -1898,7 +1896,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } finally { recursionSet.remove(source); } - newSize = context.getContentRepository().size(newClaim); } catch (final ContentNotFoundException nfe) { resetWriteClaims(); // need to reset write claim before we can remove the claim destroyContent(newClaim); @@ -1920,7 +1917,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } removeTemporaryClaim(record); - final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build(); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder() + .fromFlowFile(record.getCurrent()) + .contentClaim(newClaim) + .contentClaimOffset(0) + .size(writtenHolder.getValue()) + .build(); + record.setWorking(newFile); return newFile; } @@ -2068,8 +2071,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final ContentClaim currClaim = record.getCurrentClaim(); ContentClaim newClaim = null; - long newSize = 0L; - final long claimOffset = 0L; final LongHolder writtenHolder = new LongHolder(0L); try { newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); @@ -2109,8 +2110,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } } - - newSize = context.getContentRepository().size(newClaim); } catch (final ContentNotFoundException nfe) { destroyContent(newClaim); handleContentNotFound(nfe, record); @@ -2128,7 +2127,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } removeTemporaryClaim(record); - final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build(); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder() + .fromFlowFile(record.getCurrent()) + .contentClaim(newClaim) + .contentClaimOffset(0L) + .size(writtenHolder.getValue()) + .build(); + record.setWorking(newFile); return newFile; } @@ -2157,7 +2162,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE claimOffset = 0L; long newSize = 0L; try { - newSize = context.getContentRepository().importFrom(source, newClaim, false); + newSize = context.getContentRepository().importFrom(source, newClaim); bytesWritten.increment(newSize); bytesRead.increment(newSize); } catch (final Throwable t) { @@ -2191,7 +2196,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination); - newSize = context.getContentRepository().importFrom(source, newClaim, false); + newSize = context.getContentRepository().importFrom(source, newClaim); bytesWritten.increment(newSize); } catch (final IOException e) { throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e); http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java index 753e818..62ff276 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java @@ -28,29 +28,24 @@ package org.apache.nifi.controller.repository.claim; public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim> { private final ResourceClaim resourceClaim; - private final int hashCode; - private volatile long offset; + private final long offset; private volatile long length; public StandardContentClaim(final ResourceClaim resourceClaim, final long offset) { this.resourceClaim = resourceClaim; this.offset = offset; this.length = -1L; - this.hashCode = calculateHashCode(); } public void setLength(final long length) { this.length = length; } - public void setOffset(final long offset) { - this.offset = offset; - } - - private int calculateHashCode() { + @Override + public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + hashCode; + result = prime * result; result = prime * result + (int) (length ^ length >>> 32); result = prime * result + (int) (offset ^ offset >>> 32); result = prime * result + (resourceClaim == null ? 0 : resourceClaim.hashCode()); @@ -58,11 +53,6 @@ public final class StandardContentClaim implements ContentClaim, Comparable<Cont } @Override - public int hashCode() { - return this.hashCode; - } - - @Override public boolean equals(final Object obj) { if (this == obj) { return true; http://git-wip-us.apache.org/repos/asf/nifi/blob/53a6e962/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index 519ba9c..5ffcb3d 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -36,10 +36,8 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Random; import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.util.DiskUtils; import org.apache.nifi.stream.io.StreamUtils; @@ -136,54 +134,6 @@ public class TestFileSystemRepository { assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim()); } - @Test - public void testRewriteContentClaim() throws IOException { - final ContentClaim claim1 = repository.create(false); - assertEquals(1, repository.getClaimantCount(claim1)); - - try (final OutputStream out = repository.write(claim1)) { - out.write("abc".getBytes()); - } - assertEquals(1, repository.getClaimantCount(claim1)); - - try (final OutputStream out = repository.write(claim1)) { - out.write("cba".getBytes()); - } - assertEquals(1, repository.getClaimantCount(claim1)); - - try (final InputStream in = repository.read(claim1)) { - assertEquals('c', in.read()); - assertEquals('b', in.read()); - assertEquals('a', in.read()); - } - assertEquals(1, repository.getClaimantCount(claim1)); - - assertEquals(3, repository.size(claim1)); - - final byte[] oneMB = new byte[1024 * 1024 - 6]; - new Random().nextBytes(oneMB); - try (final OutputStream out = repository.write(claim1)) { - out.write(oneMB); - } - assertEquals(1, repository.getClaimantCount(claim1)); - - assertEquals(1024 * 1024 - 6, repository.size(claim1)); - try (final InputStream in = repository.read(claim1)) { - final byte[] buff = new byte[oneMB.length]; - StreamUtils.fillBuffer(in, buff); - assertTrue(Arrays.equals(buff, oneMB)); - } - - final ResourceClaim resourceClaim = claim1.getResourceClaim(); - final Path path = rootFile.toPath().resolve(resourceClaim.getSection()).resolve(resourceClaim.getId()); - assertTrue(Files.exists(path)); - assertEquals(0, repository.decrementClaimantCount(claim1)); - assertTrue(repository.remove(claim1)); - assertFalse(Files.exists(path)); - - final ContentClaim claim2 = repository.create(false); - assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim()); - } @Test public void testWriteWithNoContent() throws IOException { @@ -292,32 +242,6 @@ public class TestFileSystemRepository { assertTrue(Arrays.equals(expected, baos.toByteArray())); } - @Test - public void testImportFromFileWithAppend() throws IOException { - final ContentClaim claim = repository.create(false); - final File hello = new File("src/test/resources/hello.txt"); - final File goodbye = new File("src/test/resources/bye.txt"); - - repository.importFrom(hello.toPath(), claim, true); - assertContentEquals(claim, "Hello, World"); - - repository.importFrom(goodbye.toPath(), claim, true); - assertContentEquals(claim, "Hello, WorldGood-Bye, World!"); - - repository.importFrom(hello.toPath(), claim, true); - assertContentEquals(claim, "Hello, WorldGood-Bye, World!Hello, World"); - - repository.importFrom(goodbye.toPath(), claim, false); - assertContentEquals(claim, "Good-Bye, World!"); - } - - private void assertContentEquals(final ContentClaim claim, final String expected) throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (final InputStream in = repository.read(claim)) { - StreamUtils.copy(in, baos); - } - assertEquals(expected, new String(baos.toByteArray())); - } @Test public void testImportFromStream() throws IOException {
