If these files are only a few MB at the most, you can also just export them to a ByteArrayOutputStream. Just a thought.
On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman <[email protected]> wrote: > Joe and Mike, > > Sadly, I was not able to get very far on this. It seems that the extend > to which I copy the first half of the contents of the input stream, I > lose what comes after when I try to read again, basically, the second > half comprising the <metadata>and <demographics>elements which I was > hoping to SAX-parse. Here's code and output. I have highlighted the > output to make it easier to read. > > ? <#> > |try| > |{| > |||InputStream inputStream = session.read( flowfile );| > |||System.out.println( ||"This is the input stream first time around > (before copying to output stream)..."| |);| > |||System.out.println( StreamUtilities.fromStream( inputStream ) );| > |||inputStream.close();| > |}| > |catch||( IOException e )| > |{| > |||e.printStackTrace();| > |}| > |flowfile = session.write( flowfile, ||new| |StreamCallback()| > |{| > |||@Override| > |||public| |void| |process( InputStream inputStream, OutputStream > outputStream ) ||throws| |IOException| > |||{| > |||System.out.println( ||"And now, let's copy..."| |);| > |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream, > outputStream );| > |||}| > |} );| > |try| > |{| > |||InputStream inputStream = session.read( flowfile );| > |||System.out.println( ||"This is the input stream second time around > (after copying)..."| |);| > |||System.out.println( StreamUtilities.fromStream( inputStream ) );| > |||inputStream.close();| > |}| > |catch||( IOException e )| > |{| > |||e.printStackTrace();| > |}| > |// ...on to SAX parser which dies because the input has been truncated to| > |// exactly what was written out to the output stream| > > > Output of above: > > This is the input stream first time around (before copying to output > stream)... > <cxml> > <document> > This is the original document. > </document> > <metadata> > <date_of_service>2016-06-28 13:23</date_of_service> > </metadata> > <demographics> > <date_of_birth>1980-07-01</date_of_birth> > <age>36</age> > </demographics> > </cxml> > > And now, let's copy... > This is the input stream second time around (after copying)... > <cxml> > <document> > This is the original document. > </document> > And now, we'll go on to the SAX parser... > <cxml> <document> This is the original document. </document> > [pool-1-thread-1] ERROR [...] SAX ruleparser error: > org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML > document structures must start and end within the same entity. > > > I left off the code that prints, "And now, we'll go on to the SAX > parser..." It's in the next flowfile = session.write( ... ). I have unit > tests that verify the good functioning of > copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the > "file" is truncated; SAX finds the first "half" just fine, but there is > no second "half". If I comment out copying from input stream to output > stream, the error doesn't occur--the whole document is there. > > Thanks for looking at this again if you can, > Russ > > On 3/27/20 3:08 PM, Joe Witt wrote: > > you should be able to call write as many times as you need. just keep > > using the resulting flowfile reference into the next call. > > > > On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <[email protected]> > > wrote: > > > >> Mike, > >> > >> Many thanks for responding. Do you mean to say that all I have to do is > >> something like this? > >> > >> public void onTrigger( final ProcessContext context, final > >> ProcessSession session ) throws ProcessException > >> { > >> FlowFile flowfile = session.get(); > >> ... > >> > >> // this is will be our resulting flowfile... > >> AtomicReference< OutputStream > savedOutputStream = new > >> AtomicReference<>(); > >> > >> /* Do some processing on the in-coming flowfile then close its > >> input stream, but > >> * save the output stream for continued use. > >> */ > >> * session.write( flowfile, new InputStreamCallback()* > >> { > >> @Override > >> * public void process( InputStream inputStream, OutputStream > >> outputStream ) throws IOException* > >> { > >> savedOutputStream.set( outputStream ); > >> ... > >> > >> // processing puts some output on the output stream... > >> outputStream.write( etc. ); > >> > >> inputStream.close(); > >> } > >> * } );* > >> > >> /* Start over doing different processing on the (same/reopened) > >> in-coming flowfile > >> * continuing to use the original output stream. It's our > >> responsibility to close > >> * the saved output stream, NiFi closes the unused output stream > >> opened, but > >> * ignored by us. > >> */ > >> * session.write( flowfile, new StreamCallback()* > >> { > >> @Override > >> * public void process( InputStream inputStream, OutputStream > >> outputStream ) throws IOException* > >> { > >> outputStream = savedOutputStream.get(); // (discard the new > >> output stream) > >> ... > >> > >> // processing puts (some more) output on the original output > >> stream... > >> outputStream.write( etc. ); > >> > >> outputStream.close(); > >> } > >> * } );* > >> > >> session.transfer( flowfile, etc. ); > >> } > >> > >> I'm wondering if this will work to "discard" the new output stream > >> opened for me (the second time) and replace it with the original one > >> which was probably closed when the first call to > >> session.write()finished. What's on these streams is way too big for me > >> to put them into temporary memory, say, a ByteArrayOutputStream. > >> > >> Russ > >> > >> On 3/27/20 10:03 AM, Mike Thomsen wrote: > >>> session.read(FlowFile) just gives you an InputStream. You should be > able > >> to > >>> rerun that as many times as you want provided you properly close it. > >>> > >>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman < > [email protected]> > >>> wrote: > >>> > >>>> In my custom processor, I'm using a SAX parser to process an incoming > >>>> flowfile that's in XML. Except that, this particular XML is in essence > >>>> two different files and I would like to split, read and process the > >>>> first "half", which starts a couple of lines (XML elements) into the > >>>> file) not using the SAX parser. At the end, I would stream the output > of > >>>> the first half, then the SAX-processed second half. > >>>> > >>>> So, in short: > >>>> > >>>> 1. process the incoming flowfile for the early content not using > SAX, > >>>> but merely copying as-is; at all cost I must avoid > "reassembling" > >>>> the first half using my SAX handler (what I'm doing now), > >>>> 2. output the first part down the output stream to the resulting > >> flowfile, > >>>> 3. (re)process the incoming flowfile using SAX (and I can just skip > >>>> over the first bit) and spitting the result of this second part > out > >>>> down the output stream of the resulting flowfile. > >>>> > >>>> I guess this is tantamount to asking how, in Java, I can read an input > >>>> stream twice (or one-half plus one times). Maybe it's less a NiFi > >>>> developer question and more a Java question. I have looked at it that > >>>> way too, but, if one of you knows (particularly NiFi) best practice, I > >>>> would very much like to hear about it. > >>>> > >>>> Thanks. > >>>> > >>>> > >> > >
