Joe,

Ah, thanks. I think I have learned a lot about what's going on down inside session.read/write()today. I don't have to stand on my head. For completeness if anyone else looks for this answer, here's my code amended:

public void onTrigger( final ProcessContext context, final ProcessSession 
session ) throws ProcessException
{
  FlowFile flowfile = session.get();
  ...

  // Do some processing on the in-coming flowfile then close its input stream
  flowfile = session.write( flowfile, new InputStreamCallback()
  {
    @Override
    public void process( InputStream inputStream, OutputStream outputStream ) 
throws IOException
    {
      ...

      // processing puts some output on the output stream...
      outputStream.write( etc. );

      inputStream.close();
    }
  } );

  // Start over doing different processing on the (same/reopened) in-coming 
flowfile
  // continuing to use the (same, also reopened, but appended to) output stream.
  flowfile = session.write( flowfile, new StreamCallback()
  {
    @Override
    public void process( InputStream inputStream, OutputStream outputStream ) 
throws IOException
    {
      ...

      // processing puts (some more) output on the flowfile's output stream...
      outputStream.write( etc. );
    }
  } );

  session.transfer( flowfile, etc. );
}

As I'm fond of saying, NiFi just rocks because there's always a solution!

Russ

On 3/27/20 3:08 PM, Joe Witt wrote:
you should be able to call write as many times as you need.  just keep
using the resulting flowfile reference into the next call.

On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <[email protected]>
wrote:

Mike,

Many thanks for responding. Do you mean to say that all I have to do is
something like this?

     public void onTrigger( final ProcessContext context, final
     ProcessSession session ) throws ProcessException
     {
        FlowFile flowfile = session.get();
        ...

        // this is will be our resulting flowfile...
        AtomicReference< OutputStream > savedOutputStream = new
     AtomicReference<>();

        /* Do some processing on the in-coming flowfile then close its
     input stream, but
         * save the output stream for continued use.
         */
     *  session.write( flowfile, new InputStreamCallback()*
        {
          @Override
     *    public void process( InputStream inputStream, OutputStream
     outputStream ) throws IOException*
          {
            savedOutputStream.set( outputStream );
            ...

            // processing puts some output on the output stream...
            outputStream.write( etc. );

            inputStream.close();
          }
     *  } );*

        /* Start over doing different processing on the (same/reopened)
     in-coming flowfile
         * continuing to use the original output stream. It's our
     responsibility to close
         * the saved output stream, NiFi closes the unused output stream
     opened, but
         * ignored by us.
         */
     *  session.write( flowfile, new StreamCallback()*
        {
          @Override
     *    public void process( InputStream inputStream, OutputStream
     outputStream ) throws IOException*
          {
            outputStream = savedOutputStream.get(); // (discard the new
     output stream)
            ...

            // processing puts (some more) output on the original output
     stream...
            outputStream.write( etc. );

            outputStream.close();
          }
     *  } );*

        session.transfer( flowfile, etc. );
     }

I'm wondering if this will work to "discard" the new output stream
opened for me (the second time) and replace it with the original one
which was probably closed when the first call to
session.write()finished. What's on these streams is way too big for me
to put them into temporary memory, say, a ByteArrayOutputStream.

Russ

On 3/27/20 10:03 AM, Mike Thomsen wrote:
session.read(FlowFile) just gives you an InputStream. You should be able
to
rerun that as many times as you want provided you properly close it.

On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <[email protected]>
wrote:

In my custom processor, I'm using a SAX parser to process an incoming
flowfile that's in XML. Except that, this particular XML is in essence
two different files and I would like to split, read and process the
first "half", which starts a couple of lines (XML elements) into the
file) not using the SAX parser. At the end, I would stream the output of
the first half, then the SAX-processed second half.

So, in short:

   1. process the incoming flowfile for the early content not using SAX,
      but merely copying as-is; at all cost I must avoid "reassembling"
      the first half using my SAX handler (what I'm doing now),
   2. output the first part down the output stream to the resulting
flowfile,
   3. (re)process the incoming flowfile using SAX (and I can just skip
      over the first bit) and spitting the result of this second part out
      down the output stream of the resulting flowfile.

I guess this is tantamount to asking how, in Java, I can read an input
stream twice (or one-half plus one times). Maybe it's less a NiFi
developer question and more a Java question. I have looked at it that
way too, but, if one of you knows (particularly NiFi) best practice, I
would very much like to hear about it.

Thanks.




Reply via email to