Hi Mark,

Thanks for the example code and the considerations. I see the error I made
before.
If I were to use the standard way and put this all in one session what are
the memory characteristics? What is tracked in memory for a session and is
this actually any different (in overall memory use) from my approach of
using a separate session for output?

Thanks,
Eric

On Sat, Mar 6, 2021 at 1:22 PM Mark Payne <[email protected]> wrote:

> Hi Eric,
>
> We should definitely throw a better Exception there, rather than
> NullPointerException. But what you’re looking to do can be done. It’s
> slightly more nuanced than that, though. What you’ll need to do, in order
> to maintain the lineage, is to create the child FlowFile in the session
> that owns the parent. You can then “migrate” the child FlowFile to a new
> session. Something to the effect of:
>
> -----------------
> ProcessSession inSession = sessionFactory.create();
> FlowFile flowFile = inSession.get();
> if (flowFile == null) {
>   return;
> }
>
> ProcessSession outSession = sessionFactory.create();
>
> try (InputStream in = inSession.read(flowFile)) {
>   Record record;
>   while ((record = getRecord(in)) {
>       FlowFile child = inSession.create(flowFile);
>       try (OutputStream out = inSession.write(child)) {
>                 // write some data.
>       }
>
>       inSession.migrate(outSession, Collections.singleton(child));
>       outSession.transfer(child, REL_SUCCESS);
>       outSession.commit();
>   }
> }
>
> inSession.transfer(flowFIle, REL_ORIGINAL);
> inSession.commit();
> ———————
>
> Some things to keep in mind, though:
> - To get a session factory, you should extend
> AbstractSessionFactoryProcessor instead of AbstractProcessor.
> - This means you need to ensure that you always explicitly commit/rollback
> sessions and handle Exceptions properly, ensure that you account for any
> FlowFile that is created, etc.
> - If your incoming FlowFile has 1 million FlowFiles, and you process
> 900,000 of them and then NiFi is restarted, then on restart it’ll reprocess
> the incoming FlowFile, so you’ll end up with 900,000 duplicates.
> - Performance will be very subpar, as you’ll be creating and committing up
> to 1 million sessions per FlowFile. Perhaps this is okay if you only have
> to process one of these files per 24 hours. But it’s worth considering.
>
> Thanks
> -Mark
>
>
>
> > On Mar 5, 2021, at 5:02 PM, Eric Secules <[email protected]> wrote:
> >
> > Hi Joe,
> >
> > I was able to get it working by using one session to manage the parent
> > flowfile and then one session per split file. I couldn't do
> > splitSession.create(inputFF), was getting an NPE and another exception,
> but
> > it worked with splitSession.create() the downside is I lose the lineage
> > connection to the parent.
> >
> > 2021-03-05 20:48:35,435 ERROR [Timer-Driven Process Thread-1]
> >> c.m.p.p.MyProcessor MyProcessor[id=0034a60d-0178-1000-7f91-47cf50e242e2]
> >> Failure FF: null: java.lang.NullPointerException
> >> java.lang.NullPointerException: null
> >> at
> >>
> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:788)
> >> at
> >>
> org.apache.nifi.controller.repository.StandardProcessSession.registerForkEvent(StandardProcessSession.java:1852)
> >> at
> >>
> org.apache.nifi.controller.repository.StandardProcessSession.create(StandardProcessSession.java:1737)
> >>
> >
> > This log is from a 1.11.0 build of NiFi
> >
> > In my case the input is a proprietary text file with a gigantic schema
> > definition which we translate to JSON and split the results all at once.
> I
> > don't know whether record-based processing works for us because of how
> > fluid the json schema is.
> >
> > Thanks,
> > Eric
> >
> > On Fri, Mar 5, 2021 at 1:34 PM Joe Witt <[email protected]> wrote:
> >
> >> Eric,
> >>
> >> My point is that it sounds like you get handed an original document
> >> which is a JSON document.  It contains up to a million elements within
> >> it.  You would implement a record reader for your original doc
> >> structure and then you can use any of our current writers/etc..  But
> >> the important part is avoiding creating splits unless/until totally
> >> necessary/etc..
> >>
> >> Anyway if you go the route you're thinking of I think you'll need a
> >> different session for reading (single session for the entire source
> >> file) and a different session for all the splits you'll create. But I
> >> might be over complicating that.  MarkP could give better input.
> >>
> >> Thanks
> >>
> >> On Fri, Mar 5, 2021 at 2:18 PM Eric Secules <[email protected]> wrote:
> >>>
> >>> Hi Joe,
> >>>
> >>> For my use case partial results are okay.
> >>> The files may contain up to a million records. But we have like a day
> to
> >>> process it. We will consider record-based processing. It might be a
> >> longer
> >>> task to convert our flows to consume records instead of single files.
> >>> Will I need to have multiple sessions to handle all this?
> >>>
> >>> Thanks,
> >>> Eric
> >>>
> >>> On Fri, Mar 5, 2021 at 12:30 PM Joe Witt <[email protected]> wrote:
> >>>
> >>>> Eric
> >>>>
> >>>> The ProcessSession follows a unit of work pattern.  You can do a lot
> >>>> of things but until you commit the session it wont actually commit the
> >>>> change(s).  So if you want the behavior you describe call commit after
> >>>> transfer each time.  This is done automatically for you in most cases
> >>>> but you can call it to control the boundary.  Just remember you risk
> >>>> partial results then.  Consider you're reading the input file which
> >>>> contains 100 records lets say.  On record 51 there is a processing
> >>>> issue.  What happens then?    I'd also suggest this pattern generally
> >>>> results in poor performance.  Can you not use the record
> >>>> reader/writers to accomplish this so you can avoid turning it into a
> >>>> bunch of tiny flowfiles?
> >>>>
> >>>> Thanks
> >>>>
> >>>> On Fri, Mar 5, 2021 at 1:19 PM Eric Secules <[email protected]>
> >> wrote:
> >>>>>
> >>>>> Hello,
> >>>>>
> >>>>> I am trying to write a processor which parses an input file and
> >> emits one
> >>>>> JSON flowfile for each record in the input file. Currently we're
> >> calling
> >>>>> session.transfer() once we encounter a fragment we want to emit. But
> >> it's
> >>>>> not sending the new flowfiles to the next processor as it processes
> >> the
> >>>>> input flowfile. Instead it's holding everything until the input is
> >> fully
> >>>>> processed and releasing it all at once. Is there some way I can
> >> write the
> >>>>> processor to emit flowfiles as soon as possible rather than waiting
> >> for
> >>>>> everything to succeed?
> >>>>>
> >>>>> Thanks,
> >>>>> Eric
> >>>>
> >>
>
>

Reply via email to