Mike,
Many thanks for responding. Do you mean to say that all I have to do is
something like this?
public void onTrigger( final ProcessContext context, final
ProcessSession session ) throws ProcessException
{
FlowFile flowfile = session.get();
...
// this is will be our resulting flowfile...
AtomicReference< OutputStream > savedOutputStream = new
AtomicReference<>();
/* Do some processing on the in-coming flowfile then close its
input stream, but
* save the output stream for continued use.
*/
* session.write( flowfile, new InputStreamCallback()*
{
@Override
* public void process( InputStream inputStream, OutputStream
outputStream ) throws IOException*
{
savedOutputStream.set( outputStream );
...
// processing puts some output on the output stream...
outputStream.write( etc. );
inputStream.close();
}
* } );*
/* Start over doing different processing on the (same/reopened)
in-coming flowfile
* continuing to use the original output stream. It's our
responsibility to close
* the saved output stream, NiFi closes the unused output stream
opened, but
* ignored by us.
*/
* session.write( flowfile, new StreamCallback()*
{
@Override
* public void process( InputStream inputStream, OutputStream
outputStream ) throws IOException*
{
outputStream = savedOutputStream.get(); // (discard the new
output stream)
...
// processing puts (some more) output on the original output
stream...
outputStream.write( etc. );
outputStream.close();
}
* } );*
session.transfer( flowfile, etc. );
}
I'm wondering if this will work to "discard" the new output stream
opened for me (the second time) and replace it with the original one
which was probably closed when the first call to
session.write()finished. What's on these streams is way too big for me
to put them into temporary memory, say, a ByteArrayOutputStream.
Russ
On 3/27/20 10:03 AM, Mike Thomsen wrote:
session.read(FlowFile) just gives you an InputStream. You should be able to
rerun that as many times as you want provided you properly close it.
On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <[email protected]>
wrote:
In my custom processor, I'm using a SAX parser to process an incoming
flowfile that's in XML. Except that, this particular XML is in essence
two different files and I would like to split, read and process the
first "half", which starts a couple of lines (XML elements) into the
file) not using the SAX parser. At the end, I would stream the output of
the first half, then the SAX-processed second half.
So, in short:
1. process the incoming flowfile for the early content not using SAX,
but merely copying as-is; at all cost I must avoid "reassembling"
the first half using my SAX handler (what I'm doing now),
2. output the first part down the output stream to the resulting flowfile,
3. (re)process the incoming flowfile using SAX (and I can just skip
over the first bit) and spitting the result of this second part out
down the output stream of the resulting flowfile.
I guess this is tantamount to asking how, in Java, I can read an input
stream twice (or one-half plus one times). Maybe it's less a NiFi
developer question and more a Java question. I have looked at it that
way too, but, if one of you knows (particularly NiFi) best practice, I
would very much like to hear about it.
Thanks.