Thanks for the response. It gave my additional context such that I can
proceed with more confidence. I've been playing with creating additional
sessions and I believe that it will work for my use case. But I will keep
the duplicate flowfile potential issue in mind, though.

Thanks again.

Devin

On Sun, Mar 13, 2016 at 12:40 PM, Mark Payne <marka...@hotmail.com> wrote:

> Devin,
>
> We do realize that we have some work to do in order to make it so that
> a single Processor can buffer up hundreds of thousands or more FlowFiles.
> The SplitText processor is very popular and suffers from this exact same
> problem.
> We want to have a mechanism for swapping those out of the Java Heap,
> similar
> to how we do when we have millions of FlowFiles sitting in a queue. There
> is a ticket
> here [1] to address this. However, this has turned out to be very time
> consuming, and
> not quite a straight-forward as we had hoped, so it's not been finished up
> yet.
>
> In the meantime, you can use the approach that you described, using two
> different
> Process Sessions, by extending AbstractSessionFactoryProcessor instead of
> AbstractProcessor. The downside to this approach, though, is that when
> NiFi is restarted,
> you could potentially have a lot of data duplication.
>
> As an example, let's imagine that you create a ProcessSession and use it
> to create 10,000 FlowFiles
> and then commit the session and create a new one. If you have an incoming
> FlowFiles that has
> 1 million rows in it, you may create 800,000 FlowFiles and send them out
> and then NiFi gets restarted.
> In this case, you will pick up the original FlowFile and begin processing
> it again. But you've already sent
> out those 800,000 FlowFiles. Depending on your requirements, this may or
> may not be acceptable.
>
> One option that you could use is just to document that this behavior
> exists and that SplitText should be
> used ahead of you Processor in order to split the content into 10,000 line
> chunks. This would avoid the
> heap exhaustion.
>
> Another possible solution that you could use, though it's not as pretty as
> I'd like: Process up to 10,000 FlowFiles
> from an input FlowFile. Then, add an attribute to the input FlowFile
> indicating your progress (for instance,
> add an attribute named "rows.converted" and then do
> "session.transfer(flowFile);" This will transfer the FlowFile
> back into its input queue. You can then commit the session. Then, when you
> call session.get() to get an input
> FlowFile again, you can check for that attribute and skip that many rows.
> This way, you won't end up with
> data duplication. The downside here is that you would end up reading the
> first N rows each time and ignoring
> the content which can be expensive. A more optimized approach would be to
> wrap the InputStream in
> a ByteCountingInputStream and record the number of bytes consumed and use
> that as an attribute, and then
> for each subsequent iteration use StreamUtils.skip() to skip the
> appropriate number of bytes.
>
> I know there's a lot of info here. Let me know if anything doesn't make
> sense.
>
> I hope this helps!
> -Mark
>
>
> [1] https://issues.apache.org/jira/browse/NIFI-1008 <
> https://issues.apache.org/jira/browse/NIFI-1008>
>
>
> > On Mar 11, 2016, at 5:29 PM, Devin Fisher <
> devin.fis...@perfectsearchcorp.com> wrote:
> >
> > I'm creating a processor that will read a customer csv and will create a
> > new flowfile for each line in the form of XML. The CSV file will be quite
> > large (100s of thousands of lines). I would like to commit a reasonable
> > amount from time to time so that they can flow down to other processors.
> > But looking at similar processors SplitText and SplitXml they save up all
> > the created flowfiles and release them all at the end.  In some trials,
> I'm
> > running out of memory doing that. But I can't commit the session early
> > because I'm still reading the original CSV file.  Is there a workflow
> where
> > I can read the incoming CSV flowfile but still release created flowfiles?
> > I'm thinking of not using AbstractProcessor and instead
> > use AbstractSessionFactoryProcessor and create two different sessions but
> > is that advisable or possible?
> >
> > Devin
>
>

Reply via email to