Repository: nifi
Updated Branches:
  refs/heads/master 32b8a9b9f -> 470513fa2


NIFI-2039: Provide a new ProcessSession.read() method that provides an 
InputStream instead of using a callback

This closes #601


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/470513fa
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/470513fa
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/470513fa

Branch: refs/heads/master
Commit: 470513fa2ec10dbc328b80b8b72f88f1539e523c
Parents: 32b8a9b
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Jul 1 13:24:12 2016 -0400
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Tue Jul 5 15:41:36 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/processor/ProcessSession.java   |  68 ++++++----
 .../apache/nifi/util/MockProcessSession.java    |  65 +++++++++
 .../nifi/util/TestMockProcessSession.java       |  30 +++++
 .../repository/BatchingSessionFactory.java      |   5 +
 .../repository/StandardProcessSession.java      | 134 ++++++++++++++++++-
 .../repository/TestStandardProcessSession.java  |  86 ++++++++++--
 6 files changed, 351 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/470513fa/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java 
b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
index d0c5b46..704d2cc 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
@@ -487,29 +487,51 @@ public interface ProcessSession {
      * Executes the given callback against the contents corresponding to the
      * given FlowFile.
      *
-     * <i>Note</i>: The OutputStream provided to the given OutputStreamCallback
-     * will not be accessible once this method has completed its execution.
-     *
      * @param source flowfile to retrieve content of
      * @param reader that will be called to read the flowfile content
      * @throws IllegalStateException if detected that this method is being
-     * called from within a callback of another method in this session and for
-     * the given FlowFile(s)
+     *             called from within a callback of another method in this 
session and for
+     *             the given FlowFile(s)
      * @throws FlowFileHandlingException if the given FlowFile is already
-     * transferred or removed or doesn't belong to this session. Automatic
-     * rollback will occur.
+     *             transferred or removed or doesn't belong to this session. 
Automatic
+     *             rollback will occur.
      * @throws MissingFlowFileException if the given FlowFile content cannot be
-     * found. The FlowFile should no longer be reference, will be internally
-     * destroyed, and the session is automatically rolled back and what is left
-     * of the FlowFile is destroyed.
+     *             found. The FlowFile should no longer be referenced, will be 
internally
+     *             destroyed, and the session is automatically rolled back and 
what is left
+     *             of the FlowFile is destroyed.
      * @throws FlowFileAccessException if some IO problem occurs accessing
-     * FlowFile content; if an attempt is made to access the InputStream
-     * provided to the given InputStreamCallback after this method completed 
its
-     * execution
+     *             FlowFile content; if an attempt is made to access the 
InputStream
+     *             provided to the given InputStreamCallback after this method 
completed its
+     *             execution
      */
     void read(FlowFile source, InputStreamCallback reader) throws 
FlowFileAccessException;
 
     /**
+     * Provides an InputStream that can be used to read the contents of the 
given FlowFile.
+     * This method differs from those that make use of callbacks in that this 
method returns
+     * an InputStream and expects the caller to properly handle the lifecycle 
of the InputStream
+     * (i.e., the caller is responsible for ensuring that the InputStream is 
closed appropriately).
+     * The Process Session may or may not handle closing the stream when 
{@link #commit()} or {@link #rollback()}
+     * is called, but the responsibility of doing so belongs to the caller. 
The InputStream will throw
+     * an IOException if an attempt is made to read from the stream after the 
session is committed or
+     * rolled back.
+     *
+     * @param flowFile the FlowFile to read
+     * @return an InputStream that can be used to read the contents of the 
FlowFile
+     * @throws IllegalStateException if detected that this method is being
+     *             called from within a callback of another method in this 
session and for
+     *             the given FlowFile(s)
+     * @throws FlowFileHandlingException if the given FlowFile is already
+     *             transferred or removed or doesn't belong to this session. 
Automatic
+     *             rollback will occur.
+     * @throws MissingFlowFileException if the given FlowFile content cannot be
+     *             found. The FlowFile should no longer be referenced, will be 
internally
+     *             destroyed, and the session is automatically rolled back and 
what is left
+     *             of the FlowFile is destroyed.
+     */
+    InputStream read(FlowFile flowFile);
+
+    /**
      * Executes the given callback against the contents corresponding to the
      * given FlowFile.
      *
@@ -520,19 +542,19 @@ public interface ProcessSession {
      * @param allowSessionStreamManagement allow session to hold the stream 
open for performance reasons
      * @param reader that will be called to read the flowfile content
      * @throws IllegalStateException if detected that this method is being
-     * called from within a callback of another method in this session and for
-     * the given FlowFile(s)
+     *             called from within a callback of another method in this 
session and for
+     *             the given FlowFile(s)
      * @throws FlowFileHandlingException if the given FlowFile is already
-     * transferred or removed or doesn't belong to this session. Automatic
-     * rollback will occur.
+     *             transferred or removed or doesn't belong to this session. 
Automatic
+     *             rollback will occur.
      * @throws MissingFlowFileException if the given FlowFile content cannot be
-     * found. The FlowFile should no longer be reference, will be internally
-     * destroyed, and the session is automatically rolled back and what is left
-     * of the FlowFile is destroyed.
+     *             found. The FlowFile should no longer be reference, will be 
internally
+     *             destroyed, and the session is automatically rolled back and 
what is left
+     *             of the FlowFile is destroyed.
      * @throws FlowFileAccessException if some IO problem occurs accessing
-     * FlowFile content; if an attempt is made to access the InputStream
-     * provided to the given InputStreamCallback after this method completed 
its
-     * execution
+     *             FlowFile content; if an attempt is made to access the 
InputStream
+     *             provided to the given InputStreamCallback after this method 
completed its
+     *             execution
      */
     void read(FlowFile source, boolean allowSessionStreamManagement, 
InputStreamCallback reader) throws FlowFileAccessException;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/470513fa/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 2b22761..66db49a 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -69,6 +69,9 @@ public class MockProcessSession implements ProcessSession {
     private final Map<String, Long> counterMap = new HashMap<>();
     private final MockProvenanceReporter provenanceReporter;
 
+    // A List of InputStreams that have been created by calls to {@link 
#read(FlowFile)} and have not yet been closed.
+    private final List<InputStream> openInputStreams = new ArrayList<>();
+
     private boolean committed = false;
     private boolean rolledback = false;
     private int removedCount = 0;
@@ -127,6 +130,20 @@ public class MockProcessSession implements ProcessSession {
         if (!beingProcessed.isEmpty()) {
             throw new FlowFileHandlingException("Cannot commit session because 
the following FlowFiles have not been removed or transferred: " + 
beingProcessed);
         }
+
+        if (!openInputStreams.isEmpty()) {
+            final List<InputStream> openStreamCopy = new 
ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by 
creating a copy of the List
+            for (final InputStream openInputStream : openStreamCopy) {
+                try {
+                    openInputStream.close();
+                } catch (final IOException e) {
+                }
+            }
+
+            throw new FlowFileHandlingException("Cannot commit session because 
the following Input Streams were created via "
+                + "calls to ProcessSession.read(FlowFile) and never closed: " 
+ openStreamCopy);
+        }
+
         committed = true;
         beingProcessed.clear();
         currentVersions.clear();
@@ -428,6 +445,46 @@ public class MockProcessSession implements ProcessSession {
     }
 
     @Override
+    public InputStream read(final FlowFile flowFile) {
+        if (flowFile == null) {
+            throw new IllegalArgumentException("FlowFile cannot be null");
+        }
+
+        validateState(flowFile);
+        if (!(flowFile instanceof MockFlowFile)) {
+            throw new IllegalArgumentException("Cannot export a flow file that 
I did not create");
+        }
+        final MockFlowFile mock = (MockFlowFile) flowFile;
+
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(mock.getData());
+        final InputStream errorHandlingStream = new InputStream() {
+            @Override
+            public int read() throws IOException {
+                return bais.read();
+            }
+
+            @Override
+            public int read(byte[] b, int off, int len) throws IOException {
+                return bais.read(b, off, len);
+            }
+
+            @Override
+            public void close() throws IOException {
+                openInputStreams.remove(this);
+                bais.close();
+            }
+
+            @Override
+            public String toString() {
+                return "ErrorHandlingInputStream[flowFile=" + flowFile + "]";
+            }
+        };
+
+        openInputStreams.add(errorHandlingStream);
+        return errorHandlingStream;
+    }
+
+    @Override
     public void remove(final FlowFile flowFile) {
         validateState(flowFile);
 
@@ -531,6 +588,14 @@ public class MockProcessSession implements ProcessSession {
 
     @Override
     public void rollback(final boolean penalize) {
+        final List<InputStream> openStreamCopy = new 
ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by 
creating a copy of the List
+        for (final InputStream openInputStream : openStreamCopy) {
+            try {
+                openInputStream.close();
+            } catch (final IOException e) {
+            }
+        }
+
         for (final List<MockFlowFile> list : transferMap.values()) {
             for (final MockFlowFile flowFile : list) {
                 processorQueue.offer(flowFile);

http://git-wip-us.apache.org/repos/asf/nifi/blob/470513fa/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
index e728072..2d88351 100644
--- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
@@ -20,12 +20,21 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileHandlingException;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class TestMockProcessSession {
 
@@ -34,6 +43,27 @@ public class TestMockProcessSession {
         TestRunners.newTestRunner(PoorlyBehavedProcessor.class).run();
     }
 
+    @Test
+    public void testReadWithoutCloseThrowsExceptionOnCommit() throws 
IOException {
+        final Processor processor = new PoorlyBehavedProcessor();
+        final MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, new AtomicLong(0L)), processor);
+        FlowFile flowFile = session.createFlowFile("hello, world".getBytes());
+        final InputStream in = session.read(flowFile);
+        final byte[] buffer = new byte[12];
+        StreamUtils.fillBuffer(in, buffer);
+
+        assertEquals("hello, world", new String(buffer));
+
+        session.remove(flowFile);
+
+        try {
+            session.commit();
+            Assert.fail("Was able to commit session without closing 
InputStream");
+        } catch (final FlowFileHandlingException ffhe) {
+            System.out.println(ffhe.toString());
+        }
+    }
+
     protected static class PoorlyBehavedProcessor extends AbstractProcessor {
 
         private static final Relationship REL_FAILURE = new 
Relationship.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/470513fa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
index bac45af..63b89eb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
@@ -193,6 +193,11 @@ public class BatchingSessionFactory implements 
ProcessSessionFactory {
         }
 
         @Override
+        public InputStream read(FlowFile flowFile) {
+            return session.read(flowFile);
+        }
+
+        @Override
         public FlowFile merge(Collection<FlowFile> sources, FlowFile 
destination) {
             return session.merge(sources, destination);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/470513fa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index f6eca71..6ea5afe 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -129,6 +129,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     private ByteCountingInputStream currentReadClaimStream = null;
     private long processingStartTime;
 
+    // List of InputStreams that have been opened by calls to {@link 
#read(FlowFile)} and not yet closed
+    private final List<InputStream> openInputStreams = new ArrayList<>();
+
     // maps a FlowFile to all Provenance Events that were generated for that 
FlowFile.
     // we do this so that if we generate a Fork event, for example, and then 
remove the event in the same
     // Session, we will not send that event to the Provenance Repository
@@ -184,6 +187,18 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     public void checkpoint() {
         resetWriteClaims(false);
 
+        final List<InputStream> openStreamCopy = new 
ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by 
creating a copy of the List
+        for (final InputStream openStream : openStreamCopy) {
+            LOG.warn("{} closing {} for {} because the session was committed 
without the stream being closed.", this, openStream, 
this.connectableDescription);
+
+            try {
+                openStream.close();
+            } catch (final Exception e) {
+                LOG.warn("{} Attempted to close {} for {} due to session 
commit but close failed", this, openStream, this.connectableDescription);
+                LOG.warn("", e);
+            }
+        }
+
         if (!recursionSet.isEmpty()) {
             throw new IllegalStateException();
         }
@@ -870,6 +885,17 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         deleteOnCommit.clear();
 
+        final List<InputStream> openStreamCopy = new 
ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by 
creating a copy of the List
+        for (final InputStream openStream : openStreamCopy) {
+            LOG.debug("{} closing {} for {} due to session rollback", this, 
openStream, this.connectableDescription);
+            try {
+                openStream.close();
+            } catch (final Exception e) {
+                LOG.warn("{} Attempted to close {} for {} due to session 
rollback but close failed", this, openStream, this.connectableDescription);
+                LOG.warn("", e);
+            }
+        }
+
         final Set<StandardRepositoryRecord> recordsToHandle = new HashSet<>();
         recordsToHandle.addAll(records.values());
         if (rollbackCheckpoint) {
@@ -1761,7 +1787,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     }
 
-    private InputStream getInputStream(final FlowFile flowFile, final 
ContentClaim claim, final long offset) throws ContentNotFoundException {
+    private InputStream getInputStream(final FlowFile flowFile, final 
ContentClaim claim, final long offset, final boolean allowCachingOfStream) 
throws ContentNotFoundException {
         // If there's no content, don't bother going to the Content Repository 
because it is generally expensive and we know
         // that there is no actual content.
         if (flowFile.getSize() == 0L) {
@@ -1772,7 +1798,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             // If the recursion set is empty, we can use the same input stream 
that we already have open. However, if
             // the recursion set is NOT empty, we can't do this because we may 
be reading the input of FlowFile 1 while in the
             // callback for reading FlowFile 1 and if we used the same stream 
we'd be destroying the ability to read from FlowFile 1.
-            if (recursionSet.isEmpty()) {
+            if (allowCachingOfStream && recursionSet.isEmpty()) {
                 if (currentReadClaim == claim) {
                     if (currentReadClaimStream != null && 
currentReadClaimStream.getStreamLocation() <= offset) {
                         final long bytesToSkip = offset - 
currentReadClaimStream.getStreamLocation();
@@ -1832,7 +1858,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             throw new FlowFileAccessException("Failed to access ContentClaim 
for " + source.toString(), e);
         }
 
-        try (final InputStream rawIn = getInputStream(source, 
record.getCurrentClaim(), record.getCurrentClaimOffset());
+        try (final InputStream rawIn = getInputStream(source, 
record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
             final InputStream limitedIn = new LimitedInputStream(rawIn, 
source.getSize());
             final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
             final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
@@ -1874,6 +1900,102 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     @Override
+    public InputStream read(final FlowFile source) {
+        validateRecordState(source);
+        final StandardRepositoryRecord record = records.get(source);
+
+        try {
+            ensureNotAppending(record.getCurrentClaim());
+        } catch (final IOException e) {
+            throw new FlowFileAccessException("Failed to access ContentClaim 
for " + source.toString(), e);
+        }
+
+        final InputStream rawIn = getInputStream(source, 
record.getCurrentClaim(), record.getCurrentClaimOffset(), false);
+        final InputStream limitedIn = new LimitedInputStream(rawIn, 
source.getSize());
+        final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(limitedIn, this.bytesRead);
+        final FlowFileAccessInputStream ffais = new 
FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
+
+        final InputStream errorHandlingStream = new InputStream() {
+
+            @Override
+            public int read() throws IOException {
+                try {
+                    return ffais.read();
+                } catch (final ContentNotFoundException cnfe) {
+                    handleContentNotFound(cnfe, record);
+                    close();
+                    throw cnfe;
+                } catch (final FlowFileAccessException ffae) {
+                    LOG.error("Failed to read content from " + source + "; 
rolling back session", ffae);
+                    rollback(true);
+                    close();
+                    throw ffae;
+                }
+            }
+
+            @Override
+            public int read(final byte[] b) throws IOException {
+                return read(b, 0, b.length);
+            }
+
+            @Override
+            public int read(final byte[] b, final int off, final int len) 
throws IOException {
+                try {
+                    return ffais.read(b, off, len);
+                } catch (final ContentNotFoundException cnfe) {
+                    handleContentNotFound(cnfe, record);
+                    close();
+                    throw cnfe;
+                } catch (final FlowFileAccessException ffae) {
+                    LOG.error("Failed to read content from " + source + "; 
rolling back session", ffae);
+                    rollback(true);
+                    close();
+                    throw ffae;
+                }
+            }
+
+            @Override
+            public void close() throws IOException {
+                ffais.close();
+                openInputStreams.remove(this);
+            }
+
+            @Override
+            public int available() throws IOException {
+                return ffais.available();
+            }
+
+            @Override
+            public long skip(long n) throws IOException {
+                return ffais.skip(n);
+            }
+
+            @Override
+            public boolean markSupported() {
+                return ffais.markSupported();
+            }
+
+            @Override
+            public synchronized void mark(int readlimit) {
+                ffais.mark(readlimit);
+            }
+
+            @Override
+            public synchronized void reset() throws IOException {
+                ffais.reset();
+            }
+
+            @Override
+            public String toString() {
+                return "ErrorHandlingInputStream[FlowFile=" + source + "]";
+            }
+        };
+
+        openInputStreams.add(errorHandlingStream);
+        return errorHandlingStream;
+    }
+
+    @Override
     public FlowFile merge(final Collection<FlowFile> sources, final FlowFile 
destination) {
         return merge(sources, destination, null, null, null);
     }
@@ -2193,7 +2315,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
             ensureNotAppending(newClaim);
 
-            try (final InputStream is = getInputStream(source, currClaim, 
record.getCurrentClaimOffset());
+            try (final InputStream is = getInputStream(source, currClaim, 
record.getCurrentClaimOffset(), true);
                 final InputStream limitedIn = new LimitedInputStream(is, 
source.getSize());
                 final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
                 final InputStream countingIn = new 
ByteCountingInputStream(disableOnCloseIn, bytesRead);
@@ -2362,7 +2484,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             throw new FlowFileAccessException("Failed to access ContentClaim 
for " + source.toString(), e);
         }
 
-        try (final InputStream rawIn = getInputStream(source, 
record.getCurrentClaim(), record.getCurrentClaimOffset());
+        try (final InputStream rawIn = getInputStream(source, 
record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
                 final InputStream limitedIn = new LimitedInputStream(rawIn, 
source.getSize());
                 final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
                 final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
@@ -2428,7 +2550,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     private void validateRecordState(final FlowFile... flowFiles) {
         for (final FlowFile file : flowFiles) {
             if (recursionSet.contains(file)) {
-                throw new IllegalStateException(file + " already in use for an 
active callback");
+                throw new IllegalStateException(file + " already in use for an 
active callback or InputStream created by ProcessSession.read(FlowFile) has not 
been closed");
             }
             final StandardRepositoryRecord record = records.get(file);
             if (record == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/470513fa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 4ae2080..55c9f5a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -20,10 +20,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -50,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 import org.apache.nifi.connectable.Connectable;
@@ -79,7 +80,6 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.ObjectHolder;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -318,7 +318,7 @@ public class TestStandardProcessSession {
         flowFileQueue.put(flowFileRecord);
         FlowFile flowFile = session.get();
         assertNotNull(flowFile);
-        final ObjectHolder<OutputStream> outputStreamHolder = new 
ObjectHolder<>(null);
+        final AtomicReference<OutputStream> outputStreamHolder = new 
AtomicReference<>(null);
         flowFile = session.append(flowFile, new OutputStreamCallback() {
             @Override
             public void process(final OutputStream outputStream) throws 
IOException {
@@ -374,7 +374,7 @@ public class TestStandardProcessSession {
         flowFileQueue.put(flowFileRecord);
         final FlowFile flowFile = session.get();
         assertNotNull(flowFile);
-        final ObjectHolder<InputStream> inputStreamHolder = new 
ObjectHolder<>(null);
+        final AtomicReference<InputStream> inputStreamHolder = new 
AtomicReference<>(null);
         session.read(flowFile, true , new InputStreamCallback() {
             @Override
             public void process(final InputStream inputStream) throws 
IOException {
@@ -395,8 +395,8 @@ public class TestStandardProcessSession {
         flowFileQueue.put(flowFileRecord);
         FlowFile flowFile = session.get();
         assertNotNull(flowFile);
-        final ObjectHolder<InputStream> inputStreamHolder = new 
ObjectHolder<>(null);
-        final ObjectHolder<OutputStream> outputStreamHolder = new 
ObjectHolder<>(null);
+        final AtomicReference<InputStream> inputStreamHolder = new 
AtomicReference<>(null);
+        final AtomicReference<OutputStream> outputStreamHolder = new 
AtomicReference<>(null);
         flowFile = session.write(flowFile, new StreamCallback() {
             @Override
             public void process(final InputStream input, final OutputStream 
output) throws IOException {
@@ -419,7 +419,7 @@ public class TestStandardProcessSession {
         flowFileQueue.put(flowFileRecord);
         FlowFile flowFile = session.get();
         assertNotNull(flowFile);
-        final ObjectHolder<OutputStream> outputStreamHolder = new 
ObjectHolder<>(null);
+        final AtomicReference<OutputStream> outputStreamHolder = new 
AtomicReference<>(null);
         flowFile = session.write(flowFile, new OutputStreamCallback() {
             @Override
             public void process(final OutputStream out) throws IOException {
@@ -1180,6 +1180,76 @@ public class TestStandardProcessSession {
         assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, 
event.getEventType());
     }
 
+    @Test
+    public void testReadFromInputStream() throws IOException {
+        FlowFile flowFile = session.create();
+        flowFile = session.write(flowFile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write("hello, world".getBytes());
+            }
+        });
+
+        try (InputStream in = session.read(flowFile)) {
+            final byte[] buffer = new byte[12];
+            StreamUtils.fillBuffer(in, buffer);
+            assertEquals("hello, world", new String(buffer));
+        }
+
+        session.remove(flowFile);
+        session.commit();
+    }
+
+    @Test
+    public void testReadFromInputStreamWithoutClosingThenRemove() throws 
IOException {
+        FlowFile flowFile = session.create();
+        flowFile = session.write(flowFile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write("hello, world".getBytes());
+            }
+        });
+
+        InputStream in = session.read(flowFile);
+        final byte[] buffer = new byte[12];
+        StreamUtils.fillBuffer(in, buffer);
+        assertEquals("hello, world", new String(buffer));
+
+        session.remove(flowFile);
+        session.commit(); // This should generate a WARN log message. We can't 
really test this in a unit test but can verify manually.
+    }
+
+    @Test
+    public void testOpenMultipleInputStreamsToFlowFile() throws IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(claim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+        InputStream in = session.read(flowFile);
+        final byte[] buffer = new byte[12];
+        StreamUtils.fillBuffer(in, buffer);
+        assertEquals("hello, world", new String(buffer));
+
+        InputStream in2 = session.read(flowFile);
+        StreamUtils.fillBuffer(in2, buffer);
+        assertEquals("hello, world", new String(buffer));
+
+        in.close();
+        in2.close();
+        session.remove(flowFile);
+        session.commit();
+    }
+
     private static class MockFlowFileRepository implements FlowFileRepository {
         private boolean failOnUpdate = false;
         private final AtomicLong idGenerator = new AtomicLong(0L);

Reply via email to