Repository: nifi Updated Branches: refs/heads/master 4bccab7e0 -> 00a63d17a
NIFI-5200: Fixed issue with InputStream being closed when calling ProcessSession.read() twice against sequential Content Claims Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #2753 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/00a63d17 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/00a63d17 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/00a63d17 Branch: refs/heads/master Commit: 00a63d17af3c82727b9119acb00fccfcf6639fc5 Parents: 4bccab7 Author: Mark Payne <marka...@hotmail.com> Authored: Fri Jun 1 11:14:56 2018 -0400 Committer: Matthew Burgess <mattyb...@apache.org> Committed: Fri Jun 8 16:47:28 2018 -0400 ---------------------------------------------------------------------- .../repository/StandardProcessSession.java | 10 +++++----- .../repository/TestStandardProcessSession.java | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/00a63d17/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 2750db6..12bcafd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2141,14 +2141,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE currentReadClaim = claim; + currentReadClaimStream = new ByteCountingInputStream(rawInStream); + StreamUtils.skip(currentReadClaimStream, offset); + // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can // reuse the same InputStream for the next FlowFile - final InputStream disableOnClose = new DisableOnCloseInputStream(rawInStream); - - currentReadClaimStream = new ByteCountingInputStream(disableOnClose); - StreamUtils.skip(currentReadClaimStream, offset); + final InputStream disableOnClose = new DisableOnCloseInputStream(currentReadClaimStream); - return currentReadClaimStream; + return disableOnClose; } else { claimCache.flush(claim); final InputStream rawInStream = context.getContentRepository().read(claim); http://git-wip-us.apache.org/repos/asf/nifi/blob/00a63d17/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 55fa232..f47be4d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -350,6 +350,25 @@ public class TestStandardProcessSession { } @Test + public void testSequentialReads() throws IOException { + FlowFile ff1 = session.write(session.create(), out -> out.write(new byte[] {'A', 'B'})); + FlowFile ff2 = session.write(session.create(), out -> out.write('C')); + + final byte[] buff1 = new byte[2]; + try (final InputStream in = session.read(ff1)) { + StreamUtils.fillBuffer(in, buff1); + } + + final byte[] buff2 = new byte[1]; + try (final InputStream in = session.read(ff2)) { + StreamUtils.fillBuffer(in, buff2); + } + + Assert.assertArrayEquals(new byte[] {'A', 'B'}, buff1); + Assert.assertArrayEquals(new byte[] {'C'}, buff2); + } + + @Test public void testCloneOriginalDataLarger() throws IOException { final byte[] originalContent = "hello there 12345".getBytes(); final byte[] replacementContent = "NEW DATA".getBytes();