Eric,

The session holds onto all ‘active flowfiles’ as well as other information 
about them, such as which relationship they’ve been transferred to, which 
attributes have been added, removed, etc. So if you have a separate session for 
each outbound FlowFile, each time that session is committed, all those 
FlowFiles can be removed from the session and placed in the next queue. This is 
important because the session holds all FlowFiles it knows about in memory, but 
the queues will swap the flowfiles out - writing them to disk and then dropping 
them from memory/heap. This allows us to hold many millions of FlowFiles within 
NiFi at a time. However, if you’re trying to hold a million FlowFiles in heap, 
you’re going to use a huge amount of heap. This is predominantly due to the 
HashMap that is used to store attributes. Those can quickly use up a huge 
amount of heap.

Thanks
-Mark


> On Mar 8, 2021, at 12:53 PM, Eric Secules <[email protected]> wrote:
> 
> Hi Mark,
> 
> Thanks for the example code and the considerations. I see the error I made
> before.
> If I were to use the standard way and put this all in one session what are
> the memory characteristics? What is tracked in memory for a session and is
> this actually any different (in overall memory use) from my approach of
> using a separate session for output?
> 
> Thanks,
> Eric
> 
> On Sat, Mar 6, 2021 at 1:22 PM Mark Payne <[email protected]> wrote:
> 
>> Hi Eric,
>> 
>> We should definitely throw a better Exception there, rather than
>> NullPointerException. But what you’re looking to do can be done. It’s
>> slightly more nuanced than that, though. What you’ll need to do, in order
>> to maintain the lineage, is to create the child FlowFile in the session
>> that owns the parent. You can then “migrate” the child FlowFile to a new
>> session. Something to the effect of:
>> 
>> -----------------
>> ProcessSession inSession = sessionFactory.create();
>> FlowFile flowFile = inSession.get();
>> if (flowFile == null) {
>>  return;
>> }
>> 
>> ProcessSession outSession = sessionFactory.create();
>> 
>> try (InputStream in = inSession.read(flowFile)) {
>>  Record record;
>>  while ((record = getRecord(in)) {
>>      FlowFile child = inSession.create(flowFile);
>>      try (OutputStream out = inSession.write(child)) {
>>                // write some data.
>>      }
>> 
>>      inSession.migrate(outSession, Collections.singleton(child));
>>      outSession.transfer(child, REL_SUCCESS);
>>      outSession.commit();
>>  }
>> }
>> 
>> inSession.transfer(flowFIle, REL_ORIGINAL);
>> inSession.commit();
>> ———————
>> 
>> Some things to keep in mind, though:
>> - To get a session factory, you should extend
>> AbstractSessionFactoryProcessor instead of AbstractProcessor.
>> - This means you need to ensure that you always explicitly commit/rollback
>> sessions and handle Exceptions properly, ensure that you account for any
>> FlowFile that is created, etc.
>> - If your incoming FlowFile has 1 million FlowFiles, and you process
>> 900,000 of them and then NiFi is restarted, then on restart it’ll reprocess
>> the incoming FlowFile, so you’ll end up with 900,000 duplicates.
>> - Performance will be very subpar, as you’ll be creating and committing up
>> to 1 million sessions per FlowFile. Perhaps this is okay if you only have
>> to process one of these files per 24 hours. But it’s worth considering.
>> 
>> Thanks
>> -Mark
>> 
>> 
>> 
>>> On Mar 5, 2021, at 5:02 PM, Eric Secules <[email protected]> wrote:
>>> 
>>> Hi Joe,
>>> 
>>> I was able to get it working by using one session to manage the parent
>>> flowfile and then one session per split file. I couldn't do
>>> splitSession.create(inputFF), was getting an NPE and another exception,
>> but
>>> it worked with splitSession.create() the downside is I lose the lineage
>>> connection to the parent.
>>> 
>>> 2021-03-05 20:48:35,435 ERROR [Timer-Driven Process Thread-1]
>>>> c.m.p.p.MyProcessor MyProcessor[id=0034a60d-0178-1000-7f91-47cf50e242e2]
>>>> Failure FF: null: java.lang.NullPointerException
>>>> java.lang.NullPointerException: null
>>>> at
>>>> 
>> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:788)
>>>> at
>>>> 
>> org.apache.nifi.controller.repository.StandardProcessSession.registerForkEvent(StandardProcessSession.java:1852)
>>>> at
>>>> 
>> org.apache.nifi.controller.repository.StandardProcessSession.create(StandardProcessSession.java:1737)
>>>> 
>>> 
>>> This log is from a 1.11.0 build of NiFi
>>> 
>>> In my case the input is a proprietary text file with a gigantic schema
>>> definition which we translate to JSON and split the results all at once.
>> I
>>> don't know whether record-based processing works for us because of how
>>> fluid the json schema is.
>>> 
>>> Thanks,
>>> Eric
>>> 
>>> On Fri, Mar 5, 2021 at 1:34 PM Joe Witt <[email protected]> wrote:
>>> 
>>>> Eric,
>>>> 
>>>> My point is that it sounds like you get handed an original document
>>>> which is a JSON document.  It contains up to a million elements within
>>>> it.  You would implement a record reader for your original doc
>>>> structure and then you can use any of our current writers/etc..  But
>>>> the important part is avoiding creating splits unless/until totally
>>>> necessary/etc..
>>>> 
>>>> Anyway if you go the route you're thinking of I think you'll need a
>>>> different session for reading (single session for the entire source
>>>> file) and a different session for all the splits you'll create. But I
>>>> might be over complicating that.  MarkP could give better input.
>>>> 
>>>> Thanks
>>>> 
>>>> On Fri, Mar 5, 2021 at 2:18 PM Eric Secules <[email protected]> wrote:
>>>>> 
>>>>> Hi Joe,
>>>>> 
>>>>> For my use case partial results are okay.
>>>>> The files may contain up to a million records. But we have like a day
>> to
>>>>> process it. We will consider record-based processing. It might be a
>>>> longer
>>>>> task to convert our flows to consume records instead of single files.
>>>>> Will I need to have multiple sessions to handle all this?
>>>>> 
>>>>> Thanks,
>>>>> Eric
>>>>> 
>>>>> On Fri, Mar 5, 2021 at 12:30 PM Joe Witt <[email protected]> wrote:
>>>>> 
>>>>>> Eric
>>>>>> 
>>>>>> The ProcessSession follows a unit of work pattern.  You can do a lot
>>>>>> of things but until you commit the session it wont actually commit the
>>>>>> change(s).  So if you want the behavior you describe call commit after
>>>>>> transfer each time.  This is done automatically for you in most cases
>>>>>> but you can call it to control the boundary.  Just remember you risk
>>>>>> partial results then.  Consider you're reading the input file which
>>>>>> contains 100 records lets say.  On record 51 there is a processing
>>>>>> issue.  What happens then?    I'd also suggest this pattern generally
>>>>>> results in poor performance.  Can you not use the record
>>>>>> reader/writers to accomplish this so you can avoid turning it into a
>>>>>> bunch of tiny flowfiles?
>>>>>> 
>>>>>> Thanks
>>>>>> 
>>>>>> On Fri, Mar 5, 2021 at 1:19 PM Eric Secules <[email protected]>
>>>> wrote:
>>>>>>> 
>>>>>>> Hello,
>>>>>>> 
>>>>>>> I am trying to write a processor which parses an input file and
>>>> emits one
>>>>>>> JSON flowfile for each record in the input file. Currently we're
>>>> calling
>>>>>>> session.transfer() once we encounter a fragment we want to emit. But
>>>> it's
>>>>>>> not sending the new flowfiles to the next processor as it processes
>>>> the
>>>>>>> input flowfile. Instead it's holding everything until the input is
>>>> fully
>>>>>>> processed and releasing it all at once. Is there some way I can
>>>> write the
>>>>>>> processor to emit flowfiles as soon as possible rather than waiting
>>>> for
>>>>>>> everything to succeed?
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Eric
>>>>>> 
>>>> 
>> 
>> 

Reply via email to