you should be able to call write as many times as you need. just keep using the resulting flowfile reference into the next call.
On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <[email protected]> wrote: > Mike, > > Many thanks for responding. Do you mean to say that all I have to do is > something like this? > > public void onTrigger( final ProcessContext context, final > ProcessSession session ) throws ProcessException > { > FlowFile flowfile = session.get(); > ... > > // this is will be our resulting flowfile... > AtomicReference< OutputStream > savedOutputStream = new > AtomicReference<>(); > > /* Do some processing on the in-coming flowfile then close its > input stream, but > * save the output stream for continued use. > */ > * session.write( flowfile, new InputStreamCallback()* > { > @Override > * public void process( InputStream inputStream, OutputStream > outputStream ) throws IOException* > { > savedOutputStream.set( outputStream ); > ... > > // processing puts some output on the output stream... > outputStream.write( etc. ); > > inputStream.close(); > } > * } );* > > /* Start over doing different processing on the (same/reopened) > in-coming flowfile > * continuing to use the original output stream. It's our > responsibility to close > * the saved output stream, NiFi closes the unused output stream > opened, but > * ignored by us. > */ > * session.write( flowfile, new StreamCallback()* > { > @Override > * public void process( InputStream inputStream, OutputStream > outputStream ) throws IOException* > { > outputStream = savedOutputStream.get(); // (discard the new > output stream) > ... > > // processing puts (some more) output on the original output > stream... > outputStream.write( etc. ); > > outputStream.close(); > } > * } );* > > session.transfer( flowfile, etc. ); > } > > I'm wondering if this will work to "discard" the new output stream > opened for me (the second time) and replace it with the original one > which was probably closed when the first call to > session.write()finished. What's on these streams is way too big for me > to put them into temporary memory, say, a ByteArrayOutputStream. > > Russ > > On 3/27/20 10:03 AM, Mike Thomsen wrote: > > session.read(FlowFile) just gives you an InputStream. You should be able > to > > rerun that as many times as you want provided you properly close it. > > > > On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <[email protected]> > > wrote: > > > >> In my custom processor, I'm using a SAX parser to process an incoming > >> flowfile that's in XML. Except that, this particular XML is in essence > >> two different files and I would like to split, read and process the > >> first "half", which starts a couple of lines (XML elements) into the > >> file) not using the SAX parser. At the end, I would stream the output of > >> the first half, then the SAX-processed second half. > >> > >> So, in short: > >> > >> 1. process the incoming flowfile for the early content not using SAX, > >> but merely copying as-is; at all cost I must avoid "reassembling" > >> the first half using my SAX handler (what I'm doing now), > >> 2. output the first part down the output stream to the resulting > flowfile, > >> 3. (re)process the incoming flowfile using SAX (and I can just skip > >> over the first bit) and spitting the result of this second part out > >> down the output stream of the resulting flowfile. > >> > >> I guess this is tantamount to asking how, in Java, I can read an input > >> stream twice (or one-half plus one times). Maybe it's less a NiFi > >> developer question and more a Java question. I have looked at it that > >> way too, but, if one of you knows (particularly NiFi) best practice, I > >> would very much like to hear about it. > >> > >> Thanks. > >> > >> > >
