Sorry, typo. ‘modified’ should have been ‘output’.
> On Mar 31, 2020, at 2:44 PM, Russell Bateman <r...@windofkeltia.com> wrote:
>
> (Oh, I see where *out*comes from, but not *modified*.)
>
> On 3/31/20 12:35 PM, Russell Bateman wrote:
>> Wait, where is *modified*from?
>>
>> Thanks
>>
>> On 3/31/20 12:24 PM, Mark Payne wrote:
>>> Russ,
>>>
>>> OK, so then I think the pattern you’d want to follow would be something
>>> like this:
>>>
>>> FlowFile original = session.get();
>>> if (flowFile == null) {
>>> return;
>>> }
>>>
>>> FlowFile output = session.create(original);
>>>
>>> // Begin writing to ‘output flowfile'
>>> output = session.write(*modified*, new OutputStreamCallback() {
>>> void process(OutputStream*out*) {
>>>
>>> // read from original FlowFile
>>> session.read(original, new InputStreamCallback() {
>>> void process(InputStream in) {
>>> copyFirstHalf(in, out);
>>> }
>>> });
>>>
>>>
>>> // read from original FlowFile a second time. Use a SAX parser to
>>> parse it and write to the end of the ‘output flowfile'
>>> session.read(original, new InputStreamCallback() {
>>> void process(InputStream in) {
>>> processWithSaxParser(in,*out*);
>>> }
>>> });
>>>
>>> }
>>> });
>>>
>>> session.transfer(output, REL_SUCCESS);
>>> session.remove(original);
>>>
>>>
>>> Thanks
>>> -Mark
>>>
>>>
>>>> On Mar 31, 2020, at 2:04 PM, Russell Bateman<r...@windofkeltia.com> wrote:
>>>>
>>>> Mark,
>>>>
>>>> Thanks for getting back. My steps are:
>>>>
>>>> 1. Read the "first half" of the input stream copying it to the output
>>>> stream. This is because I need to preserve the exact form of it (spacing,
>>>> indentation, lines, etc.) without change whatsoever. If I
>>>>
>>>> 2. Reopen the stream from the beginning with a SAX parser. Its handler,
>>>> which I wrote, will ignore the original part that I'm holding for
>>>> sacred--everything between <document> and </document>.
>>>>
>>>> 3. The SAX handler writes the rest of the XML with a few changes out
>>>> appending it to that same output stream on which the original "half" was
>>>> written. (This does not seem to work.)
>>>>
>>>> I was not seeing this as "overwriting" flowfile content, but, in my tiny
>>>> little mind, I imagined an input stream, which I want to read exactly a)
>>>> one-half, then again, b) one-whole time, and an output stream to which I
>>>> start to write by copying (a), followed by a modification of (b) yet, the
>>>> whole (b) or "second half." Then I'm done. I was thinking of the input
>>>> stream as from the in-coming flowfile and a separate thing from the output
>>>> stream which I see as being offered to me for my use in creating a new
>>>> flowfile to transfer to. I guess this is not how it works.
>>>>
>>>> My in-coming flowfiles can be megabytes in size. Copying to a string is
>>>> not an option. Copying to a temporary file "isn't NiFi" as I understand
>>>> it. I was hoping to avoid writing another processor or two to a) break up
>>>> the flowfile into <document> ... </document> and (all the rest), fix (all
>>>> the rest), then stitch the two back together in a later processor. I see
>>>> having to coordinate the two halves of what used to be one file fraught
>>>> with precarity and confusion, but I guess that's the solution I'm left
>>>> with?
>>>>
>>>> Thanks,
>>>> Russ
>>>>
>>>>
>>>> On 3/31/20 10:23 AM, Mark Payne wrote:
>>>>> Russ,
>>>>>
>>>>> As far as I can tell, this is working exactly as expected.
>>>>>
>>>>> To verify, I created a simple Integration test, as well, which I attached
>>>>> below.
>>>>>
>>>>> Let me outline what I *think* you’re trying to do here and please correct
>>>>> me if I’m wrong:
>>>>>
>>>>> 1. Read the content of the FlowFile. (Via session.read)
>>>>> 2. Overwrite the content of the FlowFile. (This is done by session.write)
>>>>> 3. Overwrite the content of the FlowFile again. (Via session.write)
>>>>>
>>>>> The third step is the part where I’m confused. You’re calling
>>>>> session.write() again. In the callback, you’ll receive an InputStream
>>>>> that contains the contents of the FlowFile (which have now been modified,
>>>>> per Step 2). You’re also given an OutputStream to write the new content
>>>>> to.
>>>>> If you then return without writing anything to the OutputStream, as in
>>>>> the example that you attached, then yes, you’ll have erased all of the
>>>>> FlowFile’s content.
>>>>>
>>>>> It’s unclear to me exactly what you’re attempting to accomplish in the
>>>>> third step. It *sounds* like you’re expecting the content of the
>>>>> original/incoming FlowFile. But you’re not going to get that because
>>>>> you’ve already overwritten that FlowFile’s content. If that is what
>>>>> you’re trying to do, I think what you’d want to do is something more like
>>>>> this:
>>>>>
>>>>> FlowFile original = session.get();
>>>>> If (original == null) {
>>>>> return;
>>>>> }
>>>>>
>>>>> session.read(original, new InputStreamCallback() {…});
>>>>>
>>>>> FlowFile childFlowFile = session.create(original); // Create a ‘child’
>>>>> flow file whose content is equal to the original FlowFile’s content.
>>>>> session.write(childFlowFile, new StreamCallback() {…});
>>>>>
>>>>> // Read the original FlowFile’s content
>>>>> session.read(original, new InputStreamCallback() { … });
>>>>>
>>>>> session.transfer(childFlowFile, REL_SUCCESS);
>>>>> session.remove(original); // or transfer to an ‘original’ relationship or
>>>>> whatever makes sense for you.
>>>>>
>>>>>
>>>>>
>>>>> Hope this helps!
>>>>> -Mark
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>> On Mar 30, 2020, at 4:23 PM, Russell Bateman <r...@windofkeltia.com
>>>>>> <mailto:r...@windofkeltia.com>> wrote:
>>>>>>
>>>>>> If I haven't worn out my welcome, here is the simplified code that
>>>>>> should demonstrate either that I have miscoded your suggestions or that
>>>>>> the API doesn't in fact work as advertised. First, the output. The code,
>>>>>> both JUnit test and processor are attached and the files are pretty
>>>>>> small.
>>>>>>
>>>>>> Much thanks,
>>>>>> Russ
>>>>>>
>>>>>> This is the input stream first time around (before copying)
>>>>>> ===================================
>>>>>> * * * session.read( flowfile );
>>>>>> Here's what's in input stream:
>>>>>> *<cxml>**
>>>>>> **<document>**
>>>>>> ** This is the original document.**
>>>>>> **</document>**
>>>>>> **<metadata>**
>>>>>> **<date_of_service>2016-06-28 13:23</date_of_service>**
>>>>>> **</metadata>**
>>>>>> **<demographics>**
>>>>>> **<date_of_birth>1980-07-01</date_of_birth>**
>>>>>> **<age>36</age>**
>>>>>> **</demographics>**
>>>>>> **</cxml>*
>>>>>>
>>>>>> And now, let's copy some of the input stream to the output stream
>>>>>> =============================
>>>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>>> Copying input stream to output stream up to </document>...
>>>>>> The output stream has in it at this point:
>>>>>> *<cxml>**
>>>>>> **<document>**
>>>>>> ** This is the original document.**
>>>>>> **</document>**
>>>>>> *
>>>>>> [1. When we examine the output stream, it has what we expect.]
>>>>>>
>>>>>> After copying, can we reopen input stream intact and does outputstream
>>>>>> have what we think? ====
>>>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>>> Here's what's in input stream:
>>>>>> *<cxml>**
>>>>>> **<document>**
>>>>>> ** This is the original document.**
>>>>>> **</document>*
>>>>>>
>>>>>> [2. The input stream as reported just above is truncated by exactly the
>>>>>> content we did
>>>>>> not copy to the output stream. We expected to see the entire,
>>>>>> original file, but the
>>>>>> second half is gone.]
>>>>>>
>>>>>> Here's what's in the output stream at this point:
>>>>>> * (nothing)*
>>>>>>
>>>>>> [3. The content we copied to the output stream has disappeared. Does it
>>>>>> disappear simply
>>>>>> because we looked at it (printed it out here)?]
>>>>>>
>>>>>>
>>>>>> On 3/29/20 5:05 AM, Joe Witt wrote:
>>>>>>> Russell
>>>>>>>
>>>>>>> I recommend writing very simple code that does two successive read/write
>>>>>>> operations on basic data so you can make sure the api work/as expected.
>>>>>>> Then add the xml bits.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen<mikerthom...@gmail.com>
>>>>>>> <mailto:mikerthom...@gmail.com> wrote:
>>>>>>>
>>>>>>>> If these files are only a few MB at the most, you can also just export
>>>>>>>> them
>>>>>>>> to a ByteArrayOutputStream. Just a thought.
>>>>>>>>
>>>>>>>> On Sun, Mar 29, 2020 at 12:16 AM Russell
>>>>>>>> Bateman<r...@windofkeltia.com> <mailto:r...@windofkeltia.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Joe and Mike,
>>>>>>>>>
>>>>>>>>> Sadly, I was not able to get very far on this. It seems that the
>>>>>>>>> extend
>>>>>>>>> to which I copy the first half of the contents of the input stream, I
>>>>>>>>> lose what comes after when I try to read again, basically, the second
>>>>>>>>> half comprising the <metadata>and <demographics>elements which I was
>>>>>>>>> hoping to SAX-parse. Here's code and output. I have highlighted the
>>>>>>>>> output to make it easier to read.
>>>>>>>>>
>>>>>>>>> ? <#>
>>>>>>>>> |try|
>>>>>>>>> |{|
>>>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>>>> |||System.out.println( ||"This is the input stream first time around
>>>>>>>>> (before copying to output stream)..."| |);|
>>>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>>>> |||inputStream.close();|
>>>>>>>>> |}|
>>>>>>>>> |catch||( IOException e )|
>>>>>>>>> |{|
>>>>>>>>> |||e.printStackTrace();|
>>>>>>>>> |}|
>>>>>>>>> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
>>>>>>>>> |{|
>>>>>>>>> |||@Override|
>>>>>>>>> |||public| |void| |process( InputStream inputStream, OutputStream
>>>>>>>>> outputStream ) ||throws| |IOException|
>>>>>>>>> |||{|
>>>>>>>>> |||System.out.println( ||"And now, let's copy..."| |);|
>>>>>>>>> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
>>>>>>>>> outputStream );|
>>>>>>>>> |||}|
>>>>>>>>> |} );|
>>>>>>>>> |try|
>>>>>>>>> |{|
>>>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>>>> |||System.out.println( ||"This is the input stream second time around
>>>>>>>>> (after copying)..."| |);|
>>>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>>>> |||inputStream.close();|
>>>>>>>>> |}|
>>>>>>>>> |catch||( IOException e )|
>>>>>>>>> |{|
>>>>>>>>> |||e.printStackTrace();|
>>>>>>>>> |}|
>>>>>>>>> |// ...on to SAX parser which dies because the input has been
>>>>>>>>> truncated
>>>>>>>> to|
>>>>>>>>> |// exactly what was written out to the output stream|
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Output of above:
>>>>>>>>>
>>>>>>>>> This is the input stream first time around (before copying to output
>>>>>>>>> stream)...
>>>>>>>>> <cxml>
>>>>>>>>> <document>
>>>>>>>>> This is the original document.
>>>>>>>>> </document>
>>>>>>>>> <metadata>
>>>>>>>>> <date_of_service>2016-06-28 13:23</date_of_service>
>>>>>>>>> </metadata>
>>>>>>>>> <demographics>
>>>>>>>>> <date_of_birth>1980-07-01</date_of_birth>
>>>>>>>>> <age>36</age>
>>>>>>>>> </demographics>
>>>>>>>>> </cxml>
>>>>>>>>>
>>>>>>>>> And now, let's copy...
>>>>>>>>> This is the input stream second time around (after copying)...
>>>>>>>>> <cxml>
>>>>>>>>> <document>
>>>>>>>>> This is the original document.
>>>>>>>>> </document>
>>>>>>>>> And now, we'll go on to the SAX parser...
>>>>>>>>> <cxml> <document> This is the original document. </document>
>>>>>>>>> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
>>>>>>>>> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
>>>>>>>>> document structures must start and end within the same entity.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I left off the code that prints, "And now, we'll go on to the SAX
>>>>>>>>> parser..." It's in the next flowfile = session.write( ... ). I have
>>>>>>>>> unit
>>>>>>>>> tests that verify the good functioning of
>>>>>>>>> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
>>>>>>>>> "file" is truncated; SAX finds the first "half" just fine, but there
>>>>>>>>> is
>>>>>>>>> no second "half". If I comment out copying from input stream to output
>>>>>>>>> stream, the error doesn't occur--the whole document is there.
>>>>>>>>>
>>>>>>>>> Thanks for looking at this again if you can,
>>>>>>>>> Russ
>>>>>>>>>
>>>>>>>>> On 3/27/20 3:08 PM, Joe Witt wrote:
>>>>>>>>>> you should be able to call write as many times as you need. just
>>>>>>>>>> keep
>>>>>>>>>> using the resulting flowfile reference into the next call.
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman
>>>>>>>>>> <r...@windofkeltia.com <mailto:r...@windofkeltia.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Mike,
>>>>>>>>>>>
>>>>>>>>>>> Many thanks for responding. Do you mean to say that all I have to do
>>>>>>>> is
>>>>>>>>>>> something like this?
>>>>>>>>>>>
>>>>>>>>>>> public void onTrigger( final ProcessContext context, final
>>>>>>>>>>> ProcessSession session ) throws ProcessException
>>>>>>>>>>> {
>>>>>>>>>>> FlowFile flowfile = session.get();
>>>>>>>>>>> ...
>>>>>>>>>>>
>>>>>>>>>>> // this is will be our resulting flowfile...
>>>>>>>>>>> AtomicReference< OutputStream > savedOutputStream = new
>>>>>>>>>>> AtomicReference<>();
>>>>>>>>>>>
>>>>>>>>>>> /* Do some processing on the in-coming flowfile then close
>>>>>>>>>>> its
>>>>>>>>>>> input stream, but
>>>>>>>>>>> * save the output stream for continued use.
>>>>>>>>>>> */
>>>>>>>>>>> * session.write( flowfile, new InputStreamCallback()*
>>>>>>>>>>> {
>>>>>>>>>>> @Override
>>>>>>>>>>> * public void process( InputStream inputStream, OutputStream
>>>>>>>>>>> outputStream ) throws IOException*
>>>>>>>>>>> {
>>>>>>>>>>> savedOutputStream.set( outputStream );
>>>>>>>>>>> ...
>>>>>>>>>>>
>>>>>>>>>>> // processing puts some output on the output stream...
>>>>>>>>>>> outputStream.write( etc. );
>>>>>>>>>>>
>>>>>>>>>>> inputStream.close();
>>>>>>>>>>> }
>>>>>>>>>>> * } );*
>>>>>>>>>>>
>>>>>>>>>>> /* Start over doing different processing on the
>>>>>>>> (same/reopened)
>>>>>>>>>>> in-coming flowfile
>>>>>>>>>>> * continuing to use the original output stream. It's our
>>>>>>>>>>> responsibility to close
>>>>>>>>>>> * the saved output stream, NiFi closes the unused output
>>>>>>>> stream
>>>>>>>>>>> opened, but
>>>>>>>>>>> * ignored by us.
>>>>>>>>>>> */
>>>>>>>>>>> * session.write( flowfile, new StreamCallback()*
>>>>>>>>>>> {
>>>>>>>>>>> @Override
>>>>>>>>>>> * public void process( InputStream inputStream, OutputStream
>>>>>>>>>>> outputStream ) throws IOException*
>>>>>>>>>>> {
>>>>>>>>>>> outputStream = savedOutputStream.get(); // (discard the
>>>>>>>> new
>>>>>>>>>>> output stream)
>>>>>>>>>>> ...
>>>>>>>>>>>
>>>>>>>>>>> // processing puts (some more) output on the original
>>>>>>>> output
>>>>>>>>>>> stream...
>>>>>>>>>>> outputStream.write( etc. );
>>>>>>>>>>>
>>>>>>>>>>> outputStream.close();
>>>>>>>>>>> }
>>>>>>>>>>> * } );*
>>>>>>>>>>>
>>>>>>>>>>> session.transfer( flowfile, etc. );
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> I'm wondering if this will work to "discard" the new output stream
>>>>>>>>>>> opened for me (the second time) and replace it with the original one
>>>>>>>>>>> which was probably closed when the first call to
>>>>>>>>>>> session.write()finished. What's on these streams is way too big for
>>>>>>>>>>> me
>>>>>>>>>>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>>>>>>>>>>
>>>>>>>>>>> Russ
>>>>>>>>>>>
>>>>>>>>>>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>>>>>>>>>>> session.read(FlowFile) just gives you an InputStream. You should be
>>>>>>>>> able
>>>>>>>>>>> to
>>>>>>>>>>>> rerun that as many times as you want provided you properly close
>>>>>>>>>>>> it.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>>>>>>>>> r...@windofkeltia.com <mailto:r...@windofkeltia.com>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> In my custom processor, I'm using a SAX parser to process an
>>>>>>>> incoming
>>>>>>>>>>>>> flowfile that's in XML. Except that, this particular XML is in
>>>>>>>> essence
>>>>>>>>>>>>> two different files and I would like to split, read and process
>>>>>>>>>>>>> the
>>>>>>>>>>>>> first "half", which starts a couple of lines (XML elements) into
>>>>>>>>>>>>> the
>>>>>>>>>>>>> file) not using the SAX parser. At the end, I would stream the
>>>>>>>> output
>>>>>>>>> of
>>>>>>>>>>>>> the first half, then the SAX-processed second half.
>>>>>>>>>>>>>
>>>>>>>>>>>>> So, in short:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. process the incoming flowfile for the early content not
>>>>>>>>>>>>> using
>>>>>>>>> SAX,
>>>>>>>>>>>>> but merely copying as-is; at all cost I must avoid
>>>>>>>>> "reassembling"
>>>>>>>>>>>>> the first half using my SAX handler (what I'm doing now),
>>>>>>>>>>>>> 2. output the first part down the output stream to the
>>>>>>>>>>>>> resulting
>>>>>>>>>>> flowfile,
>>>>>>>>>>>>> 3. (re)process the incoming flowfile using SAX (and I can just
>>>>>>>> skip
>>>>>>>>>>>>> over the first bit) and spitting the result of this second
>>>>>>>> part
>>>>>>>>> out
>>>>>>>>>>>>> down the output stream of the resulting flowfile.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I guess this is tantamount to asking how, in Java, I can read an
>>>>>>>> input
>>>>>>>>>>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>>>>>>>>>>> developer question and more a Java question. I have looked at it
>>>>>>>> that
>>>>>>>>>>>>> way too, but, if one of you knows (particularly NiFi) best
>>>>>>>> practice, I
>>>>>>>>>>>>> would very much like to hear about it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>> <ReadSplitWrite.java><ReadSplitWriteTest.java>
>>
>