Cool, thanks. I'll take a look at that Processor. I've learned a lot from looking at the processor in the main project.
Devin On Thu, Mar 17, 2016 at 10:46 AM, Joe Witt <[email protected]> wrote: > Devin, > > Good stuff. Unless the api call to change the content completes > normally the transformation effectively did not happen. Take a look > at CompressContent [1] for an example. > > It reads in the original content and streams out new content. If that > process of transformation fails it routes the flow file to failure. > What gets routed is the original representation of the data. > > Go ahead and give a really simple form of your case a try. Do > something like always throw an exception in your callback to test it > and then you can validate the behavior meets your needs. In fact, > that could be a great test case. > > [1] > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java > > Thanks > Joe > > On Thu, Mar 17, 2016 at 12:36 PM, Devin Fisher > <[email protected]> wrote: > > Thanks for the detailed response. The details about the batching were > very > > helpful. I've changed my processors to take advantage of the batching > that > > framework provides since my use case fits what you described to a tee. > > > > But I have a question about the failure relationship. Maybe a concrete > > example might help. So let's say I have an XML document that I'm > processing > > with a Processor that converts XML to JSON. This Processor makes use an > SAX > > parser so it doesn't have to read the whole XML document into memory. The > > SAX parser read from an input stream provided by Nifi and calls SAX > > callbacks base on the XML it encounters. These callbacks write the JSON > to > > an output stream provided by Nifi. So as soon as the Processor starts it > > has written to the current Session. If towards the end of the XML > document > > it hits a syntax error, let say there is a text element with an '&' that > > has not been encoded as an entity. The SAX parser will fail and will not > be > > able to complete successfully. (the parser is very strict) But the > > processor will have already written most of the resulting JSON but not > all. > > At this point if I transfer it the failed relationship without rolling > back > > what will I get? Will I get the incomplete JSON or will I get the invalid > > but complete XML? I'm guessing that it will be half written JSON. What I > > would think you would want is the complete XML so that a processor that > > fixes the '&' entity issue could handle the issue and route it back. Or a > > different processor could log it or something else. > > > > Anyway, I hope that example explains the confusion that I still have > about > > handling error cases. Rolling back here would not be helpful from what I > > understand because it will just get processed again by the same Processor > > with the SAX parser that can't handle the '&' over and over again. The > > penalty will make sure that other documents get process before it is > tried > > again but that one malformed XML document will never progress because it > > will always fail in the same way each time and be rollback again. > > > > Again thanks for the reply. The info was really useful and helped my > > understanding. > > > > Devin > > > > On Tue, Mar 15, 2016 at 7:27 PM, Joe Witt <[email protected]> wrote: > > > >> Devin, > >> > >> So the session follows the unit of work pattern that Martin Fowler > >> describes [1]. The idea here is that whether you access one flow file > >> or many flow files or anything in between you are doing some session > >> of work. You can commit all the things you do on that session or > >> rollback all the things you do on that session. > >> > >> This pattern/concept provides a nice and safe environment in which the > >> contract between the developer and framework is well understood. > >> Generally, rollback is only necessary when some unplanned or > >> unexpected exception occurs and it is largely there so that the > >> framework can ensure all things are returned to a safe state. It can > >> also do things like penalize that processor/extension so that if it is > >> some programming error that it will reduce its impact on the system > >> overall. > >> > >> So, with that said there are two ways to think about your case. It > >> appears you are doing your own batching and probably this is for > >> higher throughput and also it appears you'd really like to treat each > >> flow file independently in terms of logic/handling. > >> > >> This is precisely why in addition to this nice clean unit of work > >> pattern we also support automated session batching (this is what > >> Andrew was referring to). In this mode you can add an annotation to > >> your processor called @SupportsBatching which signals to the framework > >> that it may attempt to automatically combine subsequent calls to > >> commit into small batches and commit them in a single batch. In this > >> way you can build your processor in a very simple single flow file > >> sort of manner and call commit. But the framework will combine a > >> series of commits in a very small time window together to get higher > >> throughput. In the UI a user can signal their willingness to let the > >> framework to do this and acknowledge that they may be trading off some > >> small latency in favor of higher throughput. Now there are some > >> additional things to think about when using this. For instance, it is > >> best used when the processor and its function is side effect free > >> meaning that there are no external system state changes or things like > >> that. In this sense you can think of the processor you're building as > >> idempotent (as the REST folks like to say). If your processor fits > >> that then SupportsBatching can have really powerful results. > >> > >> Now, you also mention that some flow files you'd consider failures and > >> others you'd consider something else, presumably success. This is > >> perfect and very common and does not require a rollback. Keep > >> rollback in mind for bad stuff that can happen that you don't plan > >> for. For the scenario of failures that you can predict such as > >> invalid data or invalid state of something you actually want to have a > >> failure relationship on that processor and simply route things there. > >> From a 'developer' perspective this is not a rollback case. "Failure" > >> then is as planned for and expected as "success". So you go ahead and > >> route the flowfile to failure and call commit. All good. It is the > >> person designing this loosely coupled and highly cohesive set of > >> components together in a flow that gets to decide what failure means > >> for their context. > >> > >> Lots of info here and probably not well written or with big gaps. > >> You're asking the right questions so just keep asking. Lots of folks > >> here that want to help. > >> > >> [1] http://martinfowler.com/eaaCatalog/unitOfWork.html > >> [2] > >> > https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#session-rollback > >> > >> Thanks > >> Joe > >> > >> On Tue, Mar 15, 2016 at 7:25 PM, Devin Fisher > >> <[email protected]> wrote: > >> > Thanks for your reply. I'm sorry if my question seems confusing. I'm > >> still > >> > learning how nifi works. I don't have any understand about how the > >> > framework works on the back end and incomplete understanding of the > >> exposed > >> > interface. From my point view (an external process developer) asking > to > >> > rollback the one flow file that failed (I don't want changes made to > it > >> > incompletely) and lets the other n flowfiles move on seems reasonable. > >> But > >> > I don't know what is happening in the session on the back end. > >> > > >> > I likely don't really understand what happens on a rollback. Reading > the > >> > developer's guide I got the impression that rollback disregards all > >> changes > >> > made the session include transfers. It then returns the flowfiles to > the > >> > queue. It would seem that a session is really finished and not usable > >> after > >> > a rollback. So, I then don't understand how I can do my use case. I > want > >> to > >> > rollback (undo changes to a single flow file that failed) and then > >> transfer > >> > it to the Failed relationship unchanged (or add the discard.reason to > the > >> > attributes). > >> > > >> > I assume you mean "Run duration" when you refer to the 'scheduling' > tab. > >> I > >> > would love to understand better how that works. In the documentation, > I > >> > only see a note about it in the User guide. But the developer's > guide is > >> > silent. I don't see how that slider is enforced in the processor > code. It > >> > seems that once the framework has ceded control to the processor it > can > >> run > >> > for as long as it wants. So more information about this would be > great. > >> > > >> > Thanks again for the response. The information is always useful and > >> > enlighting. > >> > Devin > >> > > >> > On Tue, Mar 15, 2016 at 4:26 PM, Andrew Grande < > [email protected]> > >> > wrote: > >> > > >> >> Devin, > >> >> > >> >> What you're asking for is a contradicting requirement. One trades > >> >> individual message transactional control (and necessary overhead) for > >> the > >> >> higher throughput with micro-batching (but lesser control). In short, > >> you > >> >> can't expect to rollback a message and not affect the whole batch. > >> >> > >> >> However, if you 'commit' this batch as received by your processor, > and > >> >> take on the responsibility of storing, tracking and commit/rollback > of > >> >> those yourself for downstream connection.... But then, why? > >> >> > >> >> In general, one should leverage NiFi 'Scheduling' tab and have the > >> >> micro-batching aspect controlled via the framework. Unless you really > >> >> really have a very good reason to do it yourself. > >> >> > >> >> Hope this helps, > >> >> Andrew > >> >> > >> >> > >> >> > >> >> > >> >> On 3/7/16, 5:00 PM, "Devin Fisher" < > [email protected]> > >> >> wrote: > >> >> > >> >> >Question about rollbacks. I have a processor that is grabbing a > list of > >> >> >FlowFiles from session.get(100). It will then process each flow file > >> one > >> >> at > >> >> >a time. I want to then be able if there is an error with a single > >> >> FlowFile > >> >> >to roll it back (and only this failed FlowFile) and transfer it to > the > >> >> >FAILED relationship. But reading the javadoc for ProcessSession I > don't > >> >> get > >> >> >the sense that I can do that. > >> >> > > >> >> >Is my workflow wrong, should I only get one at a time from the > session > >> and > >> >> >commit after each one? > >> >> > > >> >> >Devin > >> >> > >> >
