Russell, Glad you found a working solution. Maybe it would be better for you to write up your findings and share them with a broader audience. I have often seen the best explanations are written by people who were recently in the “how do I do X?” state, as they are closest to the problem and can walk through their process of gathering understanding. Someone who works on these methods day in and day out may not write for the appropriate audience or explain the experience as well.
Andy LoPresto alopre...@apache.org alopresto.apa...@gmail.com He/Him PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > On Aug 27, 2020, at 10:10 AM, Russell Bateman <r...@windofkeltia.com> wrote: > > I needed to get back here... > > I took this advice to heart and finished my processor. Thanks to Matt and > Mark for all their suggestions! They cleared up a few things. There was one > bug in the code that was mine, small, but significant in its effect on the > rest. That mistake also explained why I thought the uuidwas identical between > at least two of the cloned flowfiles. What I would wish for, and am probably > not strong enough to write, would be a synthesis of the session methods > read() and write() and how best to use them (one-to-one, one-to-many, etc.). > Javadoc is too paratactic by nature, the NiFi Developer's Guide almost silent > on these methods. If it were not for the many existing examples using these > methods, it would be hard to learn to do even simple things. I did look for > something closer to what I needed to do, but unsuccessfully. > > Thanks again. If anything, the NiFi mailing lists are a place both for great > information and being treated well. > > Russ > > On 8/25/20 12:24 PM, Mark Payne wrote: >> Russ, >> >> Several comments here. I’ve included them inline, below. >> >> Hope it’s helpful. >> >> Thanks >> -Mark >> >> >>> On Aug 25, 2020, at 2:09 PM, Russell Bateman <r...@windofkeltia.com> wrote: >>> >>> Thanks for your suggestions, Matt. >>> >>> I decided to keep the original flowfile only upon failure. So, I have the >>> embedded-document file and the serialized POJOs created from processing the >>> non embedded-document part as the result if successful. (Condensed code at >>> end...) >>> >>> Now I have three questions... >>> >>> 1. I seem not to have placated NiFi with the assurance that I have >>> transferred or disposed of all three flowfiles suitably. I get: >>> >>> java.lang.AssertionError: >>> org.apache.nifi.processor.exception.FlowFileHandlingException: Cannot >>> commit session because the following FlowFiles have not been removed or >>> transferred: [2] >>>> This is probably because at the end of the block, you catch Exception and >>>> then route the original FlowFile to failure. But you’ve already cloned it >>>> and didn’t deal with the clone. >>> *Which of the three flowfiles does [2] refer to? Or does it just mean I >>> botched two flowfiles. * >>> >>> 2. session.clone()generates a new flowfile with the identical uuid. I don't >>> think I want the result to be two flowfiles with the same uuid. I am >>> binding them together so I can associate them later using attribute >>> embedded-document. *Should I/How do I force cloning to acquire new >>> **uuid**s?* >>>>> This appears to actually be a bug in the mock framework. It *should* have >>>>> a unique uuid, and would in a running NiFi instance. Feel free to file a >>>>> Jira for that. >>> 3. A question on theory... *Wouldn't all of this cloning be expensive* and >>> I should just clone for one of the new files and then mangle the original >>> flowfile to become the other? >>>> session.clone() is not particularly expensive. It’s just creating a new >>>> FlowFile object. It doesn’t clone the FlowFile’s contents. >> That said, it is probably more appropriate to call session.create(flowFile), >> rather than session.clone(flowFile). It makes little difference in practice >> but what you’re really doing is forking a child, and that will come across >> more cleanly in the Provenance lineage that is generated if using >> session.create(flowFile). >> >> Additional comments in code below. >> >> >>> Thanks, >>> Russ >>> >>> >>> @Override >>> public void onTrigger( final ProcessContext context, final ProcessSession >>> session ) throws ProcessException >>> { >>> FlowFile flowfile = session.get(); >>> >>> if( flowfile == null ) >>> { >>> context.yield(); >>>>> No need to yield here. Let the framework handle the scheduling. >>>>> ProcessContext.yield() is meant for cases where you’re communicating with >>>>> some external service, for instance, and you know the service is >>>>> unavailable or rate limiting you or something like that. You can’t make >>>>> any progress, so tell NiFi to not bother wasting CPU cycles with this >>>>> Processor. >>> return; >>> } >>> >>> try >>> { >>> final String UUID = flowfile.getAttribute( NiFiUtilities.UUID ); >>> >>> FlowFile document = session.clone( flowfile ); >>> >>> * // excerpt and write the embedded document to a new flowfile...* >>> session.write( document, new OutputStreamCallback() >>> { >>> @Override public void process( OutputStream outputStream ) >>> { >>> // read from the original flowfile copying to the output flowfile... >>> session.read( flowfile, new InputStreamCallback() >>> { >>> @Override public void process( InputStream inputStream ) throws >>> IOException >>> { >>> ... >>> } >>> } ); >>> } >>> } ); >>> >>> FlowFile concepts = session.clone( flowfile ); >>> >>> AtomicReference< ConceptList > conceptListHolder = new >>> AtomicReference<>(); >>> >>> * // parse the concepts into a POJO list...* >>> session.read( concepts, new InputStreamCallback() >>> { >>> final ConceptList conceptList = conceptListHolder.get(); >>> >>> @Override public void process( InputStream inputStream ) throws >>> IOException >>> { >>> ... >>> } >>> } ); >>> >>> * // write out the concept POJOs serialized...* >>> session.write( concepts, new OutputStreamCallback() >>> { >>> @Override public void process( OutputStream outputStream ) >>> { >>> ... >>> } >>> } ); >>>> At this point, you’ve written to the ‘document’ flowfile once, written to >>>> the ‘concepts’ flowfile once and read the original FlowFile twice (well >>>> read the original flowfile once and read the clone once, which amounts to >>>> the same thing). >> You could instead do something like: >> >> FlowFile document = session.create(flowFile); >> FlowFile concepts = session.create(flowFile); >> >> try (final InputStream input = session.read(flowFile)) { >> try (final OutputStream documentOut = session.write(document); >> final OutputStream conceptOut = session.write(concept)) { >> >> // Perform processing. >> >> } >> } >> >> In this way, you avoid reading the input FlowFile twice. Of course, you >> provided an abstraction of the code, so it’s possible that this won’t >> actually work, depending on what you’re doing to read the input... >> >>> document = session.putAttribute( document, "embedded-document", UUID ); >>> concepts = session.putAttribute( document, "embedded-document", UUID ); >>> session.transfer( document, DOCUMENT ); >>> session.transfer( concepts, CONCEPTS ); >>> session.remove( flowfile ); >>> } >>> catch( Exception e ) >>> { >>> session.transfer( flowfile, FAILURE ); >>> } >>> } >>> >>> On 8/24/20 4:52 PM, Matt Burgess wrote: >>>> Russell, >>>> >>>> session.read() won't overwrite any contents of the incoming flow file, >>>> but write() will. For #2, are you doing any processing on the file? If >>>> not, wouldn't that be the original flowfile anyway? Or do you want it >>>> to be a different flowfile on purpose (so you can send the incoming >>>> flowfile to a different relationship)? You can use session.clone() to >>>> create a new flowfile that has the same content and attributes from >>>> the incoming flowfile, then handle that separately from the incoming >>>> (original) flowfile. For #1, you could clone() the original flowfile >>>> and do the read/process/write as part of a session.write(FlowFile, >>>> StreamCallback) call, then you're technically reading the "new" file >>>> content (which is the same of course) and overwriting it on the way >>>> out. >>>> >>>> Regards, >>>> Matt >>>> >>>> On Mon, Aug 24, 2020 at 6:37 PM Russell Bateman <r...@windofkeltia.com> >>>> wrote: >>>>> I am writing a custom processor that, upon processing a flowfile, >>>>> results in two new flowfiles (neither keeping the exact, original >>>>> content) out two different relationships. I might like to route the >>>>> original flowfile to a separate relationship. >>>>> >>>>> FlowFile original = session.get(); >>>>> >>>>> Do I need to call session.create()for the two new files? >>>>> >>>>> 1. session.read()of original file's contents, not all of the way >>>>> through, but send the processed output from what I do read as >>>>> flowfile 1. >>>>> 2. session.read()of original file's contents and send resulting output >>>>> as flowfile 2. >>>>> 3. session.transfer()of original flowfile. >>>>> >>>>> I look at all of these session.read()and session.write()calls and I'm a >>>>> bit confused as to which to use that won't lose the original flowfile's >>>>> content after #1 so I can start over again in #2. >>>>> >>>>> Thanks. >