Joe,
Ah, thanks. I think I have learned a lot about what's going on down
inside session.read/write()today. I don't have to stand on my head. For
completeness if anyone else looks for this answer, here's my code amended:
public void onTrigger( final ProcessContext context, final ProcessSession
session ) throws ProcessException
{
FlowFile flowfile = session.get();
...
// Do some processing on the in-coming flowfile then close its input stream
flowfile = session.write( flowfile, new InputStreamCallback()
{
@Override
public void process( InputStream inputStream, OutputStream outputStream )
throws IOException
{
...
// processing puts some output on the output stream...
outputStream.write( etc. );
inputStream.close();
}
} );
// Start over doing different processing on the (same/reopened) in-coming
flowfile
// continuing to use the (same, also reopened, but appended to) output stream.
flowfile = session.write( flowfile, new StreamCallback()
{
@Override
public void process( InputStream inputStream, OutputStream outputStream )
throws IOException
{
...
// processing puts (some more) output on the flowfile's output stream...
outputStream.write( etc. );
}
} );
session.transfer( flowfile, etc. );
}
As I'm fond of saying, NiFi just rocks because there's always a solution!
Russ
On 3/27/20 3:08 PM, Joe Witt wrote:
you should be able to call write as many times as you need. just keep
using the resulting flowfile reference into the next call.
On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <[email protected]>
wrote:
Mike,
Many thanks for responding. Do you mean to say that all I have to do is
something like this?
public void onTrigger( final ProcessContext context, final
ProcessSession session ) throws ProcessException
{
FlowFile flowfile = session.get();
...
// this is will be our resulting flowfile...
AtomicReference< OutputStream > savedOutputStream = new
AtomicReference<>();
/* Do some processing on the in-coming flowfile then close its
input stream, but
* save the output stream for continued use.
*/
* session.write( flowfile, new InputStreamCallback()*
{
@Override
* public void process( InputStream inputStream, OutputStream
outputStream ) throws IOException*
{
savedOutputStream.set( outputStream );
...
// processing puts some output on the output stream...
outputStream.write( etc. );
inputStream.close();
}
* } );*
/* Start over doing different processing on the (same/reopened)
in-coming flowfile
* continuing to use the original output stream. It's our
responsibility to close
* the saved output stream, NiFi closes the unused output stream
opened, but
* ignored by us.
*/
* session.write( flowfile, new StreamCallback()*
{
@Override
* public void process( InputStream inputStream, OutputStream
outputStream ) throws IOException*
{
outputStream = savedOutputStream.get(); // (discard the new
output stream)
...
// processing puts (some more) output on the original output
stream...
outputStream.write( etc. );
outputStream.close();
}
* } );*
session.transfer( flowfile, etc. );
}
I'm wondering if this will work to "discard" the new output stream
opened for me (the second time) and replace it with the original one
which was probably closed when the first call to
session.write()finished. What's on these streams is way too big for me
to put them into temporary memory, say, a ByteArrayOutputStream.
Russ
On 3/27/20 10:03 AM, Mike Thomsen wrote:
session.read(FlowFile) just gives you an InputStream. You should be able
to
rerun that as many times as you want provided you properly close it.
On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <[email protected]>
wrote:
In my custom processor, I'm using a SAX parser to process an incoming
flowfile that's in XML. Except that, this particular XML is in essence
two different files and I would like to split, read and process the
first "half", which starts a couple of lines (XML elements) into the
file) not using the SAX parser. At the end, I would stream the output of
the first half, then the SAX-processed second half.
So, in short:
1. process the incoming flowfile for the early content not using SAX,
but merely copying as-is; at all cost I must avoid "reassembling"
the first half using my SAX handler (what I'm doing now),
2. output the first part down the output stream to the resulting
flowfile,
3. (re)process the incoming flowfile using SAX (and I can just skip
over the first bit) and spitting the result of this second part out
down the output stream of the resulting flowfile.
I guess this is tantamount to asking how, in Java, I can read an input
stream twice (or one-half plus one times). Maybe it's less a NiFi
developer question and more a Java question. I have looked at it that
way too, but, if one of you knows (particularly NiFi) best practice, I
would very much like to hear about it.
Thanks.