Repository: nifi Updated Branches: refs/heads/master 10254a03c -> 0688363d8
NIFI-5200: Fixed bug that caused the wrong InputStream to be closed by StandardProcessSession if calling Session.read() from the callback of another Session.read(); also changed default of the 'allowSessionStreamManagement' flag from 'false' to 'true' as it will provide performance benefits in some cases and NIFI-516 outlined that it should be 'true' initially but ended up being false when the PR was merged. This closes #2707 Signed-off-by: Mike Thomsen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0688363d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0688363d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0688363d Branch: refs/heads/master Commit: 0688363d87ecdcc68a9dd18888c64a06063c3a15 Parents: 10254a0 Author: Mark Payne <[email protected]> Authored: Wed May 16 10:16:45 2018 -0400 Committer: Mike Thomsen <[email protected]> Committed: Wed May 16 12:27:19 2018 -0400 ---------------------------------------------------------------------- .../repository/StandardProcessSession.java | 13 ++++++++----- .../repository/TestStandardProcessSession.java | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0688363d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index c6e79bf..2750db6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2140,12 +2140,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } currentReadClaim = claim; - currentReadClaimStream = new ByteCountingInputStream(rawInStream); - StreamUtils.skip(currentReadClaimStream, offset); // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can // reuse the same InputStream for the next FlowFile - return new DisableOnCloseInputStream(currentReadClaimStream); + final InputStream disableOnClose = new DisableOnCloseInputStream(rawInStream); + + currentReadClaimStream = new ByteCountingInputStream(disableOnClose); + StreamUtils.skip(currentReadClaimStream, offset); + + return currentReadClaimStream; } else { claimCache.flush(claim); final InputStream rawInStream = context.getContentRepository().read(claim); @@ -2173,7 +2176,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void read(final FlowFile source, final InputStreamCallback reader) { - read(source, false, reader); + read(source, true, reader); } @Override @@ -2208,7 +2211,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE reader.process(createTaskTerminationStream(ffais)); // Allow processors to close the file after reading to avoid too many files open or do smart session stream management. - if (this.currentReadClaimStream != null && !allowSessionStreamManagement) { + if (rawIn == currentReadClaimStream && !allowSessionStreamManagement) { currentReadClaimStream.close(); currentReadClaimStream = null; } http://git-wip-us.apache.org/repos/asf/nifi/blob/0688363d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index f6089dc..55fa232 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -331,6 +331,25 @@ public class TestStandardProcessSession { } @Test + public void testEmbeddedReads() { + FlowFile ff1 = session.write(session.create(), out -> out.write(new byte[] {'A', 'B'})); + FlowFile ff2 = session.write(session.create(), out -> out.write('C')); + + session.read(ff1, in1 -> { + int a = in1.read(); + assertEquals('A', a); + + session.read(ff2, in2 -> { + int c = in2.read(); + assertEquals('C', c); + }); + + int b = in1.read(); + assertEquals('B', b); + }); + } + + @Test public void testCloneOriginalDataLarger() throws IOException { final byte[] originalContent = "hello there 12345".getBytes(); final byte[] replacementContent = "NEW DATA".getBytes();
