Cool, thanks. I'll take a look at that Processor.  I've learned a lot from
looking at the processor in the main project.

Devin

On Thu, Mar 17, 2016 at 10:46 AM, Joe Witt <[email protected]> wrote:

> Devin,
>
> Good stuff.  Unless the api call to change the content completes
> normally the transformation effectively did not happen.  Take a look
> at CompressContent [1] for an example.
>
> It reads in the original content and streams out new content.  If that
> process of transformation fails it routes the flow file to failure.
> What gets routed is the original representation of the data.
>
> Go ahead and give a really simple form of your case a try.  Do
> something like always throw an exception in your callback to test it
> and then you can validate the behavior meets your needs.  In fact,
> that could be a great test case.
>
> [1]
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
>
> Thanks
> Joe
>
> On Thu, Mar 17, 2016 at 12:36 PM, Devin Fisher
> <[email protected]> wrote:
> > Thanks for the detailed response. The details about the batching were
> very
> > helpful. I've changed my processors to take advantage of the batching
> that
> > framework provides since my use case fits what you described to a tee.
> >
> > But I have a question about the failure relationship. Maybe a concrete
> > example might help. So let's say I have an XML document that I'm
> processing
> > with a Processor that converts XML to JSON. This Processor makes use an
> SAX
> > parser so it doesn't have to read the whole XML document into memory. The
> > SAX parser read from an input stream provided by Nifi and calls SAX
> > callbacks base on the XML it encounters. These callbacks write the JSON
> to
> > an output stream provided by Nifi. So as soon as the Processor starts it
> > has written to the current Session. If towards the end of the XML
> document
> > it hits a syntax error, let say there is a text element with an '&' that
> > has not been encoded as an entity. The SAX parser will fail and will not
> be
> > able to complete successfully. (the parser is very strict) But the
> > processor will have already written most of the resulting JSON but not
> all.
> > At this point if I transfer it the failed relationship without rolling
> back
> > what will I get? Will I get the incomplete JSON or will I get the invalid
> > but complete XML? I'm guessing that it will be half written JSON. What I
> > would think you would want is the complete XML so that a processor that
> > fixes the '&' entity issue could handle the issue and route it back. Or a
> > different processor could log it or something else.
> >
> > Anyway, I hope that example explains the confusion that I still have
> about
> > handling error cases. Rolling back here would not be helpful from what I
> > understand because it will just get processed again by the same Processor
> > with the SAX parser that can't handle the '&' over and over again. The
> > penalty will make sure that other documents get process before it is
> tried
> > again but that one malformed XML document will never progress because it
> > will always fail in the same way each time and be rollback again.
> >
> > Again thanks for the reply. The info was really useful and helped my
> > understanding.
> >
> > Devin
> >
> > On Tue, Mar 15, 2016 at 7:27 PM, Joe Witt <[email protected]> wrote:
> >
> >> Devin,
> >>
> >> So the session follows the unit of work pattern that Martin Fowler
> >> describes [1].  The idea here is that whether you access one flow file
> >> or many flow files or anything in between you are doing some session
> >> of work.  You can commit all the things you do on that session or
> >> rollback all the things you do on that session.
> >>
> >> This pattern/concept provides a nice and safe environment in which the
> >> contract between the developer and framework is well understood.
> >> Generally, rollback is only necessary when some unplanned or
> >> unexpected exception occurs and it is largely there so that the
> >> framework can ensure all things are returned to a safe state.  It can
> >> also do things like penalize that processor/extension so that if it is
> >> some programming error that it will reduce its impact on the system
> >> overall.
> >>
> >> So, with that said there are two ways to think about your case.  It
> >> appears you are doing your own batching and probably this is for
> >> higher throughput and also it appears you'd really like to treat each
> >> flow file independently in terms of logic/handling.
> >>
> >> This is precisely why in addition to this nice clean unit of work
> >> pattern we also support automated session batching (this is what
> >> Andrew was referring to).  In this mode you can add an annotation to
> >> your processor called @SupportsBatching which signals to the framework
> >> that it may attempt to automatically combine subsequent calls to
> >> commit into small batches and commit them in a single batch.  In this
> >> way you can build your processor in a very simple single flow file
> >> sort of manner and call commit.  But the framework will combine a
> >> series of commits in a very small time window together to get higher
> >> throughput.  In the UI a user can signal their willingness to let the
> >> framework to do this and acknowledge that they may be trading off some
> >> small latency in favor of higher throughput.  Now there are some
> >> additional things to think about when using this.  For instance, it is
> >> best used when the processor and its function is side effect free
> >> meaning that there are no external system state changes or things like
> >> that.  In this sense you can think of the processor you're building as
> >> idempotent (as the REST folks like to say).  If your processor fits
> >> that then SupportsBatching can have really powerful results.
> >>
> >> Now, you also mention that some flow files you'd consider failures and
> >> others you'd consider something else, presumably success.  This is
> >> perfect and very common and does not require a rollback.  Keep
> >> rollback in mind for bad stuff that can happen that you don't plan
> >> for.  For the scenario of failures that you can predict such as
> >> invalid data or invalid state of something you actually want to have a
> >> failure relationship on that processor and simply route things there.
> >> From a 'developer' perspective this is not a rollback case.  "Failure"
> >> then is as planned for and expected as "success".  So you go ahead and
> >> route the flowfile to failure and call commit.  All good.  It is the
> >> person designing this loosely coupled and highly cohesive set of
> >> components together in a flow that gets to decide what failure means
> >> for their context.
> >>
> >> Lots of info here and probably not well written or with big gaps.
> >> You're asking the right questions so just keep asking.  Lots of folks
> >> here that want to help.
> >>
> >> [1] http://martinfowler.com/eaaCatalog/unitOfWork.html
> >> [2]
> >>
> https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#session-rollback
> >>
> >> Thanks
> >> Joe
> >>
> >> On Tue, Mar 15, 2016 at 7:25 PM, Devin Fisher
> >> <[email protected]> wrote:
> >> > Thanks for your reply. I'm sorry if my question seems confusing. I'm
> >> still
> >> > learning how nifi works. I don't have any understand about how the
> >> > framework works on the back end and incomplete understanding of the
> >> exposed
> >> > interface. From my point view (an external process developer) asking
> to
> >> > rollback the one flow file that failed (I don't want changes made to
> it
> >> > incompletely) and lets the other n flowfiles move on seems reasonable.
> >> But
> >> > I don't know what is happening in the session on the back end.
> >> >
> >> > I likely don't really understand what happens on a rollback. Reading
> the
> >> > developer's guide I got the impression that rollback disregards all
> >> changes
> >> > made the session include transfers. It then returns the flowfiles to
> the
> >> > queue. It would seem that a session is really finished and not usable
> >> after
> >> > a rollback. So, I then don't understand how I can do my use case. I
> want
> >> to
> >> > rollback (undo changes to a single flow file that failed) and then
> >> transfer
> >> > it to the Failed relationship unchanged (or add the discard.reason to
> the
> >> > attributes).
> >> >
> >> > I assume you mean "Run duration" when you refer to the 'scheduling'
> tab.
> >> I
> >> > would love to understand better how that works. In the documentation,
> I
> >> > only see a note about it in the User guide.  But the developer's
> guide is
> >> > silent. I don't see how that slider is enforced in the processor
> code. It
> >> > seems that once the framework has ceded control to the processor it
> can
> >> run
> >> > for as long as it wants. So more information about this would be
> great.
> >> >
> >> > Thanks again for the response. The information is always useful and
> >> > enlighting.
> >> > Devin
> >> >
> >> > On Tue, Mar 15, 2016 at 4:26 PM, Andrew Grande <
> [email protected]>
> >> > wrote:
> >> >
> >> >> Devin,
> >> >>
> >> >> What you're asking for is a contradicting requirement. One trades
> >> >> individual message transactional control (and necessary overhead) for
> >> the
> >> >> higher throughput with micro-batching (but lesser control). In short,
> >> you
> >> >> can't expect to rollback a message and not affect the whole batch.
> >> >>
> >> >> However, if you 'commit' this batch as received by your processor,
> and
> >> >> take on the responsibility of storing, tracking and commit/rollback
> of
> >> >> those yourself for downstream connection.... But then, why?
> >> >>
> >> >> In general, one should leverage NiFi 'Scheduling' tab and have the
> >> >> micro-batching aspect controlled via the framework. Unless you really
> >> >> really have a very good reason to do it yourself.
> >> >>
> >> >> Hope this helps,
> >> >> Andrew
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> On 3/7/16, 5:00 PM, "Devin Fisher" <
> [email protected]>
> >> >> wrote:
> >> >>
> >> >> >Question about rollbacks. I have a processor that is grabbing a
> list of
> >> >> >FlowFiles from session.get(100). It will then process each flow file
> >> one
> >> >> at
> >> >> >a time.  I want to then be able if there is an error with a single
> >> >> FlowFile
> >> >> >to roll it back (and only this failed FlowFile) and transfer it to
> the
> >> >> >FAILED relationship. But reading the javadoc for ProcessSession I
> don't
> >> >> get
> >> >> >the sense that I can do that.
> >> >> >
> >> >> >Is my workflow wrong, should I only get one at a time from the
> session
> >> and
> >> >> >commit after each one?
> >> >> >
> >> >> >Devin
> >> >>
> >>
>

Reply via email to