Repository: nifi Updated Branches: refs/heads/master 32b8a9b9f -> 470513fa2
NIFI-2039: Provide a new ProcessSession.read() method that provides an InputStream instead of using a callback This closes #601 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/470513fa Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/470513fa Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/470513fa Branch: refs/heads/master Commit: 470513fa2ec10dbc328b80b8b72f88f1539e523c Parents: 32b8a9b Author: Mark Payne <marka...@hotmail.com> Authored: Fri Jul 1 13:24:12 2016 -0400 Committer: Matt Burgess <mattyb...@apache.org> Committed: Tue Jul 5 15:41:36 2016 -0400 ---------------------------------------------------------------------- .../apache/nifi/processor/ProcessSession.java | 68 ++++++---- .../apache/nifi/util/MockProcessSession.java | 65 +++++++++ .../nifi/util/TestMockProcessSession.java | 30 +++++ .../repository/BatchingSessionFactory.java | 5 + .../repository/StandardProcessSession.java | 134 ++++++++++++++++++- .../repository/TestStandardProcessSession.java | 86 ++++++++++-- 6 files changed, 351 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/470513fa/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java index d0c5b46..704d2cc 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java @@ -487,29 +487,51 @@ public interface ProcessSession { * Executes the given callback against the contents corresponding to the * given FlowFile. * - * <i>Note</i>: The OutputStream provided to the given OutputStreamCallback - * will not be accessible once this method has completed its execution. - * * @param source flowfile to retrieve content of * @param reader that will be called to read the flowfile content * @throws IllegalStateException if detected that this method is being - * called from within a callback of another method in this session and for - * the given FlowFile(s) + * called from within a callback of another method in this session and for + * the given FlowFile(s) * @throws FlowFileHandlingException if the given FlowFile is already - * transferred or removed or doesn't belong to this session. Automatic - * rollback will occur. + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. * @throws MissingFlowFileException if the given FlowFile content cannot be - * found. The FlowFile should no longer be reference, will be internally - * destroyed, and the session is automatically rolled back and what is left - * of the FlowFile is destroyed. + * found. The FlowFile should no longer be referenced, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. * @throws FlowFileAccessException if some IO problem occurs accessing - * FlowFile content; if an attempt is made to access the InputStream - * provided to the given InputStreamCallback after this method completed its - * execution + * FlowFile content; if an attempt is made to access the InputStream + * provided to the given InputStreamCallback after this method completed its + * execution */ void read(FlowFile source, InputStreamCallback reader) throws FlowFileAccessException; /** + * Provides an InputStream that can be used to read the contents of the given FlowFile. + * This method differs from those that make use of callbacks in that this method returns + * an InputStream and expects the caller to properly handle the lifecycle of the InputStream + * (i.e., the caller is responsible for ensuring that the InputStream is closed appropriately). + * The Process Session may or may not handle closing the stream when {@link #commit()} or {@link #rollback()} + * is called, but the responsibility of doing so belongs to the caller. The InputStream will throw + * an IOException if an attempt is made to read from the stream after the session is committed or + * rolled back. + * + * @param flowFile the FlowFile to read + * @return an InputStream that can be used to read the contents of the FlowFile + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be referenced, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + */ + InputStream read(FlowFile flowFile); + + /** * Executes the given callback against the contents corresponding to the * given FlowFile. * @@ -520,19 +542,19 @@ public interface ProcessSession { * @param allowSessionStreamManagement allow session to hold the stream open for performance reasons * @param reader that will be called to read the flowfile content * @throws IllegalStateException if detected that this method is being - * called from within a callback of another method in this session and for - * the given FlowFile(s) + * called from within a callback of another method in this session and for + * the given FlowFile(s) * @throws FlowFileHandlingException if the given FlowFile is already - * transferred or removed or doesn't belong to this session. Automatic - * rollback will occur. + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. * @throws MissingFlowFileException if the given FlowFile content cannot be - * found. The FlowFile should no longer be reference, will be internally - * destroyed, and the session is automatically rolled back and what is left - * of the FlowFile is destroyed. + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. * @throws FlowFileAccessException if some IO problem occurs accessing - * FlowFile content; if an attempt is made to access the InputStream - * provided to the given InputStreamCallback after this method completed its - * execution + * FlowFile content; if an attempt is made to access the InputStream + * provided to the given InputStreamCallback after this method completed its + * execution */ void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) throws FlowFileAccessException; http://git-wip-us.apache.org/repos/asf/nifi/blob/470513fa/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 2b22761..66db49a 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -69,6 +69,9 @@ public class MockProcessSession implements ProcessSession { private final Map<String, Long> counterMap = new HashMap<>(); private final MockProvenanceReporter provenanceReporter; + // A List of InputStreams that have been created by calls to {@link #read(FlowFile)} and have not yet been closed. + private final List<InputStream> openInputStreams = new ArrayList<>(); + private boolean committed = false; private boolean rolledback = false; private int removedCount = 0; @@ -127,6 +130,20 @@ public class MockProcessSession implements ProcessSession { if (!beingProcessed.isEmpty()) { throw new FlowFileHandlingException("Cannot commit session because the following FlowFiles have not been removed or transferred: " + beingProcessed); } + + if (!openInputStreams.isEmpty()) { + final List<InputStream> openStreamCopy = new ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List + for (final InputStream openInputStream : openStreamCopy) { + try { + openInputStream.close(); + } catch (final IOException e) { + } + } + + throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via " + + "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy); + } + committed = true; beingProcessed.clear(); currentVersions.clear(); @@ -428,6 +445,46 @@ public class MockProcessSession implements ProcessSession { } @Override + public InputStream read(final FlowFile flowFile) { + if (flowFile == null) { + throw new IllegalArgumentException("FlowFile cannot be null"); + } + + validateState(flowFile); + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + + final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData()); + final InputStream errorHandlingStream = new InputStream() { + @Override + public int read() throws IOException { + return bais.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return bais.read(b, off, len); + } + + @Override + public void close() throws IOException { + openInputStreams.remove(this); + bais.close(); + } + + @Override + public String toString() { + return "ErrorHandlingInputStream[flowFile=" + flowFile + "]"; + } + }; + + openInputStreams.add(errorHandlingStream); + return errorHandlingStream; + } + + @Override public void remove(final FlowFile flowFile) { validateState(flowFile); @@ -531,6 +588,14 @@ public class MockProcessSession implements ProcessSession { @Override public void rollback(final boolean penalize) { + final List<InputStream> openStreamCopy = new ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List + for (final InputStream openInputStream : openStreamCopy) { + try { + openInputStream.close(); + } catch (final IOException e) { + } + } + for (final List<MockFlowFile> list : transferMap.values()) { for (final MockFlowFile flowFile : list) { processorQueue.offer(flowFile); http://git-wip-us.apache.org/repos/asf/nifi/blob/470513fa/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java index e728072..2d88351 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java @@ -20,12 +20,21 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileHandlingException; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.stream.io.StreamUtils; +import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.InputStream; import java.util.Collections; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; public class TestMockProcessSession { @@ -34,6 +43,27 @@ public class TestMockProcessSession { TestRunners.newTestRunner(PoorlyBehavedProcessor.class).run(); } + @Test + public void testReadWithoutCloseThrowsExceptionOnCommit() throws IOException { + final Processor processor = new PoorlyBehavedProcessor(); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor); + FlowFile flowFile = session.createFlowFile("hello, world".getBytes()); + final InputStream in = session.read(flowFile); + final byte[] buffer = new byte[12]; + StreamUtils.fillBuffer(in, buffer); + + assertEquals("hello, world", new String(buffer)); + + session.remove(flowFile); + + try { + session.commit(); + Assert.fail("Was able to commit session without closing InputStream"); + } catch (final FlowFileHandlingException ffhe) { + System.out.println(ffhe.toString()); + } + } + protected static class PoorlyBehavedProcessor extends AbstractProcessor { private static final Relationship REL_FAILURE = new Relationship.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/470513fa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java index bac45af..63b89eb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java @@ -193,6 +193,11 @@ public class BatchingSessionFactory implements ProcessSessionFactory { } @Override + public InputStream read(FlowFile flowFile) { + return session.read(flowFile); + } + + @Override public FlowFile merge(Collection<FlowFile> sources, FlowFile destination) { return session.merge(sources, destination); } http://git-wip-us.apache.org/repos/asf/nifi/blob/470513fa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index f6eca71..6ea5afe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -129,6 +129,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private ByteCountingInputStream currentReadClaimStream = null; private long processingStartTime; + // List of InputStreams that have been opened by calls to {@link #read(FlowFile)} and not yet closed + private final List<InputStream> openInputStreams = new ArrayList<>(); + // maps a FlowFile to all Provenance Events that were generated for that FlowFile. // we do this so that if we generate a Fork event, for example, and then remove the event in the same // Session, we will not send that event to the Provenance Repository @@ -184,6 +187,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE public void checkpoint() { resetWriteClaims(false); + final List<InputStream> openStreamCopy = new ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List + for (final InputStream openStream : openStreamCopy) { + LOG.warn("{} closing {} for {} because the session was committed without the stream being closed.", this, openStream, this.connectableDescription); + + try { + openStream.close(); + } catch (final Exception e) { + LOG.warn("{} Attempted to close {} for {} due to session commit but close failed", this, openStream, this.connectableDescription); + LOG.warn("", e); + } + } + if (!recursionSet.isEmpty()) { throw new IllegalStateException(); } @@ -870,6 +885,17 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE deleteOnCommit.clear(); + final List<InputStream> openStreamCopy = new ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List + for (final InputStream openStream : openStreamCopy) { + LOG.debug("{} closing {} for {} due to session rollback", this, openStream, this.connectableDescription); + try { + openStream.close(); + } catch (final Exception e) { + LOG.warn("{} Attempted to close {} for {} due to session rollback but close failed", this, openStream, this.connectableDescription); + LOG.warn("", e); + } + } + final Set<StandardRepositoryRecord> recordsToHandle = new HashSet<>(); recordsToHandle.addAll(records.values()); if (rollbackCheckpoint) { @@ -1761,7 +1787,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } - private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long offset) throws ContentNotFoundException { + private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long offset, final boolean allowCachingOfStream) throws ContentNotFoundException { // If there's no content, don't bother going to the Content Repository because it is generally expensive and we know // that there is no actual content. if (flowFile.getSize() == 0L) { @@ -1772,7 +1798,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // If the recursion set is empty, we can use the same input stream that we already have open. However, if // the recursion set is NOT empty, we can't do this because we may be reading the input of FlowFile 1 while in the // callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1. - if (recursionSet.isEmpty()) { + if (allowCachingOfStream && recursionSet.isEmpty()) { if (currentReadClaim == claim) { if (currentReadClaimStream != null && currentReadClaimStream.getStreamLocation() <= offset) { final long bytesToSkip = offset - currentReadClaimStream.getStreamLocation(); @@ -1832,7 +1858,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e); } - try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); + try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true); final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { @@ -1874,6 +1900,102 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override + public InputStream read(final FlowFile source) { + validateRecordState(source); + final StandardRepositoryRecord record = records.get(source); + + try { + ensureNotAppending(record.getCurrentClaim()); + } catch (final IOException e) { + throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e); + } + + final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), false); + final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); + final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn, this.bytesRead); + final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim()); + + final InputStream errorHandlingStream = new InputStream() { + + @Override + public int read() throws IOException { + try { + return ffais.read(); + } catch (final ContentNotFoundException cnfe) { + handleContentNotFound(cnfe, record); + close(); + throw cnfe; + } catch (final FlowFileAccessException ffae) { + LOG.error("Failed to read content from " + source + "; rolling back session", ffae); + rollback(true); + close(); + throw ffae; + } + } + + @Override + public int read(final byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + try { + return ffais.read(b, off, len); + } catch (final ContentNotFoundException cnfe) { + handleContentNotFound(cnfe, record); + close(); + throw cnfe; + } catch (final FlowFileAccessException ffae) { + LOG.error("Failed to read content from " + source + "; rolling back session", ffae); + rollback(true); + close(); + throw ffae; + } + } + + @Override + public void close() throws IOException { + ffais.close(); + openInputStreams.remove(this); + } + + @Override + public int available() throws IOException { + return ffais.available(); + } + + @Override + public long skip(long n) throws IOException { + return ffais.skip(n); + } + + @Override + public boolean markSupported() { + return ffais.markSupported(); + } + + @Override + public synchronized void mark(int readlimit) { + ffais.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + ffais.reset(); + } + + @Override + public String toString() { + return "ErrorHandlingInputStream[FlowFile=" + source + "]"; + } + }; + + openInputStreams.add(errorHandlingStream); + return errorHandlingStream; + } + + @Override public FlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) { return merge(sources, destination, null, null, null); } @@ -2193,7 +2315,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE ensureNotAppending(newClaim); - try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset()); + try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset(), true); final InputStream limitedIn = new LimitedInputStream(is, source.getSize()); final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); @@ -2362,7 +2484,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e); } - try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); + try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true); final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { @@ -2428,7 +2550,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private void validateRecordState(final FlowFile... flowFiles) { for (final FlowFile file : flowFiles) { if (recursionSet.contains(file)) { - throw new IllegalStateException(file + " already in use for an active callback"); + throw new IllegalStateException(file + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed"); } final StandardRepositoryRecord record = records.get(file); if (record == null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/470513fa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 4ae2080..55c9f5a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -20,10 +20,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.doThrow; import static org.mockito.Matchers.any; import static org.mockito.Matchers.notNull; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -50,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import org.apache.nifi.connectable.Connectable; @@ -79,7 +80,6 @@ import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.ObjectHolder; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -318,7 +318,7 @@ public class TestStandardProcessSession { flowFileQueue.put(flowFileRecord); FlowFile flowFile = session.get(); assertNotNull(flowFile); - final ObjectHolder<OutputStream> outputStreamHolder = new ObjectHolder<>(null); + final AtomicReference<OutputStream> outputStreamHolder = new AtomicReference<>(null); flowFile = session.append(flowFile, new OutputStreamCallback() { @Override public void process(final OutputStream outputStream) throws IOException { @@ -374,7 +374,7 @@ public class TestStandardProcessSession { flowFileQueue.put(flowFileRecord); final FlowFile flowFile = session.get(); assertNotNull(flowFile); - final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null); + final AtomicReference<InputStream> inputStreamHolder = new AtomicReference<>(null); session.read(flowFile, true , new InputStreamCallback() { @Override public void process(final InputStream inputStream) throws IOException { @@ -395,8 +395,8 @@ public class TestStandardProcessSession { flowFileQueue.put(flowFileRecord); FlowFile flowFile = session.get(); assertNotNull(flowFile); - final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null); - final ObjectHolder<OutputStream> outputStreamHolder = new ObjectHolder<>(null); + final AtomicReference<InputStream> inputStreamHolder = new AtomicReference<>(null); + final AtomicReference<OutputStream> outputStreamHolder = new AtomicReference<>(null); flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(final InputStream input, final OutputStream output) throws IOException { @@ -419,7 +419,7 @@ public class TestStandardProcessSession { flowFileQueue.put(flowFileRecord); FlowFile flowFile = session.get(); assertNotNull(flowFile); - final ObjectHolder<OutputStream> outputStreamHolder = new ObjectHolder<>(null); + final AtomicReference<OutputStream> outputStreamHolder = new AtomicReference<>(null); flowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException { @@ -1180,6 +1180,76 @@ public class TestStandardProcessSession { assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType()); } + @Test + public void testReadFromInputStream() throws IOException { + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write("hello, world".getBytes()); + } + }); + + try (InputStream in = session.read(flowFile)) { + final byte[] buffer = new byte[12]; + StreamUtils.fillBuffer(in, buffer); + assertEquals("hello, world", new String(buffer)); + } + + session.remove(flowFile); + session.commit(); + } + + @Test + public void testReadFromInputStreamWithoutClosingThenRemove() throws IOException { + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write("hello, world".getBytes()); + } + }); + + InputStream in = session.read(flowFile); + final byte[] buffer = new byte[12]; + StreamUtils.fillBuffer(in, buffer); + assertEquals("hello, world", new String(buffer)); + + session.remove(flowFile); + session.commit(); // This should generate a WARN log message. We can't really test this in a unit test but can verify manually. + } + + @Test + public void testOpenMultipleInputStreamsToFlowFile() throws IOException { + final ContentClaim claim = contentRepo.create(false); + try (final OutputStream out = contentRepo.write(claim)) { + out.write("hello, world".getBytes()); + } + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .size(12L) + .build(); + flowFileQueue.put(flowFileRecord); + + final FlowFile flowFile = session.get(); + InputStream in = session.read(flowFile); + final byte[] buffer = new byte[12]; + StreamUtils.fillBuffer(in, buffer); + assertEquals("hello, world", new String(buffer)); + + InputStream in2 = session.read(flowFile); + StreamUtils.fillBuffer(in2, buffer); + assertEquals("hello, world", new String(buffer)); + + in.close(); + in2.close(); + session.remove(flowFile); + session.commit(); + } + private static class MockFlowFileRepository implements FlowFileRepository { private boolean failOnUpdate = false; private final AtomicLong idGenerator = new AtomicLong(0L);