I prefer to avoid the callbacks and do this:

InputStream is = session.read(flowFile);
.... some code....
is.close();

On Fri, Mar 27, 2020 at 5:22 PM Russell Bateman <r...@windofkeltia.com>
wrote:

> Joe,
>
> Ah, thanks. I think I have learned a lot about what's going on down
> inside session.read/write()today. I don't have to stand on my head. For
> completeness if anyone else looks for this answer, here's my code amended:
>
> public void onTrigger( final ProcessContext context, final ProcessSession
> session ) throws ProcessException
> {
>    FlowFile flowfile = session.get();
>    ...
>
>    // Do some processing on the in-coming flowfile then close its input
> stream
>    flowfile = session.write( flowfile, new InputStreamCallback()
>    {
>      @Override
>      public void process( InputStream inputStream, OutputStream
> outputStream ) throws IOException
>      {
>        ...
>
>        // processing puts some output on the output stream...
>        outputStream.write( etc. );
>
>        inputStream.close();
>      }
>    } );
>
>    // Start over doing different processing on the (same/reopened)
> in-coming flowfile
>    // continuing to use the (same, also reopened, but appended to) output
> stream.
>    flowfile = session.write( flowfile, new StreamCallback()
>    {
>      @Override
>      public void process( InputStream inputStream, OutputStream
> outputStream ) throws IOException
>      {
>        ...
>
>        // processing puts (some more) output on the flowfile's output
> stream...
>        outputStream.write( etc. );
>      }
>    } );
>
>    session.transfer( flowfile, etc. );
> }
>
> As I'm fond of saying, NiFi just rocks because there's always a solution!
>
> Russ
>
> On 3/27/20 3:08 PM, Joe Witt wrote:
> > you should be able to call write as many times as you need.  just keep
> > using the resulting flowfile reference into the next call.
> >
> > On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <r...@windofkeltia.com>
> > wrote:
> >
> >> Mike,
> >>
> >> Many thanks for responding. Do you mean to say that all I have to do is
> >> something like this?
> >>
> >>      public void onTrigger( final ProcessContext context, final
> >>      ProcessSession session ) throws ProcessException
> >>      {
> >>         FlowFile flowfile = session.get();
> >>         ...
> >>
> >>         // this is will be our resulting flowfile...
> >>         AtomicReference< OutputStream > savedOutputStream = new
> >>      AtomicReference<>();
> >>
> >>         /* Do some processing on the in-coming flowfile then close its
> >>      input stream, but
> >>          * save the output stream for continued use.
> >>          */
> >>      *  session.write( flowfile, new InputStreamCallback()*
> >>         {
> >>           @Override
> >>      *    public void process( InputStream inputStream, OutputStream
> >>      outputStream ) throws IOException*
> >>           {
> >>             savedOutputStream.set( outputStream );
> >>             ...
> >>
> >>             // processing puts some output on the output stream...
> >>             outputStream.write( etc. );
> >>
> >>             inputStream.close();
> >>           }
> >>      *  } );*
> >>
> >>         /* Start over doing different processing on the (same/reopened)
> >>      in-coming flowfile
> >>          * continuing to use the original output stream. It's our
> >>      responsibility to close
> >>          * the saved output stream, NiFi closes the unused output stream
> >>      opened, but
> >>          * ignored by us.
> >>          */
> >>      *  session.write( flowfile, new StreamCallback()*
> >>         {
> >>           @Override
> >>      *    public void process( InputStream inputStream, OutputStream
> >>      outputStream ) throws IOException*
> >>           {
> >>             outputStream = savedOutputStream.get(); // (discard the new
> >>      output stream)
> >>             ...
> >>
> >>             // processing puts (some more) output on the original output
> >>      stream...
> >>             outputStream.write( etc. );
> >>
> >>             outputStream.close();
> >>           }
> >>      *  } );*
> >>
> >>         session.transfer( flowfile, etc. );
> >>      }
> >>
> >> I'm wondering if this will work to "discard" the new output stream
> >> opened for me (the second time) and replace it with the original one
> >> which was probably closed when the first call to
> >> session.write()finished. What's on these streams is way too big for me
> >> to put them into temporary memory, say, a ByteArrayOutputStream.
> >>
> >> Russ
> >>
> >> On 3/27/20 10:03 AM, Mike Thomsen wrote:
> >>> session.read(FlowFile) just gives you an InputStream. You should be
> able
> >> to
> >>> rerun that as many times as you want provided you properly close it.
> >>>
> >>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
> r...@windofkeltia.com>
> >>> wrote:
> >>>
> >>>> In my custom processor, I'm using a SAX parser to process an incoming
> >>>> flowfile that's in XML. Except that, this particular XML is in essence
> >>>> two different files and I would like to split, read and process the
> >>>> first "half", which starts a couple of lines (XML elements) into the
> >>>> file) not using the SAX parser. At the end, I would stream the output
> of
> >>>> the first half, then the SAX-processed second half.
> >>>>
> >>>> So, in short:
> >>>>
> >>>>    1. process the incoming flowfile for the early content not using
> SAX,
> >>>>       but merely copying as-is; at all cost I must avoid
> "reassembling"
> >>>>       the first half using my SAX handler (what I'm doing now),
> >>>>    2. output the first part down the output stream to the resulting
> >> flowfile,
> >>>>    3. (re)process the incoming flowfile using SAX (and I can just skip
> >>>>       over the first bit) and spitting the result of this second part
> out
> >>>>       down the output stream of the resulting flowfile.
> >>>>
> >>>> I guess this is tantamount to asking how, in Java, I can read an input
> >>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
> >>>> developer question and more a Java question. I have looked at it that
> >>>> way too, but, if one of you knows (particularly NiFi) best practice, I
> >>>> would very much like to hear about it.
> >>>>
> >>>> Thanks.
> >>>>
> >>>>
> >>
>
>

Reply via email to