Repository: nifi
Updated Branches:
  refs/heads/master 10254a03c -> 0688363d8


NIFI-5200: Fixed bug that caused the wrong InputStream to be closed by 
StandardProcessSession if calling Session.read() from the callback of another 
Session.read(); also changed default of the 'allowSessionStreamManagement' flag 
from 'false' to 'true' as it will provide performance benefits in some cases 
and NIFI-516 outlined that it should be 'true' initially but ended up being 
false when the PR was merged.

This closes #2707

Signed-off-by: Mike Thomsen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0688363d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0688363d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0688363d

Branch: refs/heads/master
Commit: 0688363d87ecdcc68a9dd18888c64a06063c3a15
Parents: 10254a0
Author: Mark Payne <[email protected]>
Authored: Wed May 16 10:16:45 2018 -0400
Committer: Mike Thomsen <[email protected]>
Committed: Wed May 16 12:27:19 2018 -0400

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java       | 13 ++++++++-----
 .../repository/TestStandardProcessSession.java   | 19 +++++++++++++++++++
 2 files changed, 27 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0688363d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index c6e79bf..2750db6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2140,12 +2140,15 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 }
 
                 currentReadClaim = claim;
-                currentReadClaimStream = new 
ByteCountingInputStream(rawInStream);
-                StreamUtils.skip(currentReadClaimStream, offset);
 
                 // Use a non-closeable stream because we want to keep it open 
after the callback has finished so that we can
                 // reuse the same InputStream for the next FlowFile
-                return new DisableOnCloseInputStream(currentReadClaimStream);
+                final InputStream disableOnClose = new 
DisableOnCloseInputStream(rawInStream);
+
+                currentReadClaimStream = new 
ByteCountingInputStream(disableOnClose);
+                StreamUtils.skip(currentReadClaimStream, offset);
+
+                return currentReadClaimStream;
             } else {
                 claimCache.flush(claim);
                 final InputStream rawInStream = 
context.getContentRepository().read(claim);
@@ -2173,7 +2176,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @Override
     public void read(final FlowFile source, final InputStreamCallback reader) {
-        read(source, false, reader);
+        read(source, true, reader);
     }
 
     @Override
@@ -2208,7 +2211,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 reader.process(createTaskTerminationStream(ffais));
 
                 // Allow processors to close the file after reading to avoid 
too many files open or do smart session stream management.
-                if (this.currentReadClaimStream != null && 
!allowSessionStreamManagement) {
+                if (rawIn == currentReadClaimStream && 
!allowSessionStreamManagement) {
                     currentReadClaimStream.close();
                     currentReadClaimStream = null;
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0688363d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index f6089dc..55fa232 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -331,6 +331,25 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testEmbeddedReads() {
+        FlowFile ff1 = session.write(session.create(), out -> out.write(new 
byte[] {'A', 'B'}));
+        FlowFile ff2 = session.write(session.create(), out -> out.write('C'));
+
+        session.read(ff1, in1 -> {
+            int a = in1.read();
+            assertEquals('A', a);
+
+            session.read(ff2, in2 -> {
+                int c = in2.read();
+                assertEquals('C', c);
+            });
+
+            int b = in1.read();
+            assertEquals('B', b);
+        });
+    }
+
+    @Test
     public void testCloneOriginalDataLarger() throws IOException {
         final byte[] originalContent = "hello there 12345".getBytes();
         final byte[] replacementContent = "NEW DATA".getBytes();

Reply via email to