Repository: nifi Updated Branches: refs/heads/master 1c1738670 -> b885f955f
NIFI-516 adding option to StandardProcessSession.read to close stream Signed-off-by: Mark Payne <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b885f955 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b885f955 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b885f955 Branch: refs/heads/master Commit: b885f955f4ee97caeaf5c3a28aab967db1be4c94 Parents: 1c17386 Author: Joseph Percivall <[email protected]> Authored: Wed Oct 7 10:50:07 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Mon Oct 26 20:23:13 2015 -0400 ---------------------------------------------------------------------- .../apache/nifi/processor/ProcessSession.java | 27 ++++++++++++++ .../apache/nifi/util/MockProcessSession.java | 8 ++++ .../repository/BatchingSessionFactory.java | 5 +++ .../repository/StandardProcessSession.java | 18 +++++++-- .../repository/TestStandardProcessSession.java | 39 +++++++++++++++++++- .../nifi/processors/standard/MergeContent.java | 6 +-- 6 files changed, 95 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java index ed46d68..e1e98d5 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java @@ -509,6 +509,33 @@ public interface ProcessSession { void read(FlowFile source, InputStreamCallback reader) throws FlowFileAccessException; /** + * Executes the given callback against the contents corresponding to the + * given FlowFile. + * + * <i>Note</i>: The OutputStream provided to the given OutputStreamCallback + * will not be accessible once this method has completed its execution. + * + * @param source flowfile to retrieve content of + * @param allowSessionStreamManagement allow session to hold the stream open for performance reasons + * @param reader that will be called to read the flowfile content + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content; if an attempt is made to access the InputStream + * provided to the given InputStreamCallback after this method completed its + * execution + */ + void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) throws FlowFileAccessException; + + /** * Combines the content of all given source FlowFiles into a single given * destination FlowFile. * http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 1060854..2045acd 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -400,6 +400,11 @@ public class MockProcessSession implements ProcessSession { @Override public void read(final FlowFile flowFile, final InputStreamCallback callback) { + read(flowFile, false, callback); + } + + @Override + public void read(final FlowFile flowFile, boolean allowSessionStreamManagement, final InputStreamCallback callback) { if (callback == null || flowFile == null) { throw new IllegalArgumentException("argument cannot be null"); } @@ -413,6 +418,9 @@ public class MockProcessSession implements ProcessSession { final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData()); try { callback.process(bais); + if(!allowSessionStreamManagement){ + bais.close(); + } } catch (final IOException e) { throw new ProcessException(e.toString(), e); } http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java index d5dba82..083510d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java @@ -188,6 +188,11 @@ public class BatchingSessionFactory implements ProcessSessionFactory { } @Override + public void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) { + session.read(source, allowSessionStreamManagement, reader); + } + + @Override public FlowFile merge(Collection<FlowFile> sources, FlowFile destination) { return session.merge(sources, destination); } http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 3ba7e4e..98a0b87 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1770,6 +1770,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void read(final FlowFile source, final InputStreamCallback reader) { + read(source, false, reader); + } + + @Override + public void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) { validateRecordState(source); final StandardRepositoryRecord record = records.get(source); @@ -1780,9 +1785,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); - final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); - final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); - final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { + final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); + final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); + final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository @@ -1795,6 +1800,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try { recursionSet.add(source); reader.process(ffais); + + // Allow processors to close the file after reading to avoid too many files open or do smart session stream management. + if(!allowSessionStreamManagement){ + currentReadClaimStream.close(); + currentReadClaimStream = null; + } } catch (final ContentNotFoundException cnfe) { cnfeThrown = true; throw cnfe; @@ -1806,6 +1817,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE throw ffais.getContentNotFoundException(); } } + } catch (final ContentNotFoundException nfe) { handleContentNotFound(nfe, record); } catch (final IOException ex) { http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 0e11923..743e185 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -85,6 +85,7 @@ public class TestStandardProcessSession { private StandardProcessSession session; private MockContentRepository contentRepo; private FlowFileQueue flowFileQueue; + private ProcessContext context; private ProvenanceEventRepository provenanceRepo; private MockFlowFileRepository flowFileRepo; @@ -187,7 +188,7 @@ public class TestStandardProcessSession { contentRepo.initialize(new StandardResourceClaimManager()); flowFileRepo = new MockFlowFileRepository(); - final ProcessContext context = new ProcessContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo); + context = new ProcessContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo); session = new StandardProcessSession(context); } @@ -329,7 +330,7 @@ public class TestStandardProcessSession { final FlowFile flowFile = session.get(); assertNotNull(flowFile); final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null); - session.read(flowFile, new InputStreamCallback() { + session.read(flowFile, true , new InputStreamCallback() { @Override public void process(final InputStream inputStream) throws IOException { inputStreamHolder.set(inputStream); @@ -721,6 +722,40 @@ public class TestStandardProcessSession { } @Test + public void testManyFilesOpened() throws IOException { + + StandardProcessSession[] standardProcessSessions = new StandardProcessSession[100000]; + for(int i = 0; i<70000;i++){ + standardProcessSessions[i] = new StandardProcessSession(context); + + FlowFile flowFile = standardProcessSessions[i].create(); + final byte[] buff = new byte["Hello".getBytes().length]; + + flowFile = standardProcessSessions[i].append(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write("Hello".getBytes()); + } + }); + + try { + standardProcessSessions[i].read(flowFile, false, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buff); + } + }); + } catch (Exception e){ + System.out.println("Failed at file:"+i); + throw e; + } + if(i%1000==0){ + System.out.println("i:"+i); + } + } + } + + @Test public void testMissingFlowFileExceptionThrownWhenUnableToReadDataStreamCallback() { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") http://git-wip-us.apache.org/repos/asf/nifi/blob/b885f955/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 2cad11e..afc2705 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -567,7 +567,7 @@ public class MergeContent extends BinFiles { final Iterator<FlowFileSessionWrapper> itr = wrappers.iterator(); while (itr.hasNext()) { final FlowFileSessionWrapper wrapper = itr.next(); - wrapper.getSession().read(wrapper.getFlowFile(), new InputStreamCallback() { + wrapper.getSession().read(wrapper.getFlowFile(), false, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { StreamUtils.copy(in, out); @@ -780,7 +780,7 @@ public class MergeContent extends BinFiles { for (final FlowFileSessionWrapper wrapper : wrappers) { final FlowFile flowFile = wrapper.getFlowFile(); - wrapper.getSession().read(flowFile, new InputStreamCallback() { + wrapper.getSession().read(flowFile, false, new InputStreamCallback() { @Override public void process(final InputStream rawIn) throws IOException { try (final InputStream in = new BufferedInputStream(rawIn)) { @@ -893,7 +893,7 @@ public class MergeContent extends BinFiles { try (final OutputStream out = new BufferedOutputStream(rawOut)) { for (final FlowFileSessionWrapper wrapper : wrappers) { final FlowFile flowFile = wrapper.getFlowFile(); - wrapper.getSession().read(flowFile, new InputStreamCallback() { + wrapper.getSession().read(flowFile, false, new InputStreamCallback() { @Override public void process(InputStream in) throws IOException { boolean canMerge = true;
