Sorry, typo. ‘modified’ should have been ‘output’.

> On Mar 31, 2020, at 2:44 PM, Russell Bateman <r...@windofkeltia.com> wrote:
> 
> (Oh, I see where *out*comes from, but not *modified*.)
> 
> On 3/31/20 12:35 PM, Russell Bateman wrote:
>> Wait, where is *modified*from?
>> 
>> Thanks
>> 
>> On 3/31/20 12:24 PM, Mark Payne wrote:
>>> Russ,
>>> 
>>> OK, so then I think the pattern you’d want to follow would be something 
>>> like this:
>>> 
>>> FlowFile original = session.get();
>>> if (flowFile == null) {
>>>     return;
>>> }
>>> 
>>> FlowFile output = session.create(original);
>>> 
>>> // Begin writing to ‘output flowfile'
>>> output = session.write(*modified*, new OutputStreamCallback() {
>>>     void process(OutputStream*out*) {
>>> 
>>>         // read from original FlowFile
>>>         session.read(original, new InputStreamCallback() {
>>>           void process(InputStream in) {
>>>                copyFirstHalf(in, out);
>>>           }
>>>        });
>>> 
>>> 
>>>         // read from original FlowFile a second time. Use a SAX parser to 
>>> parse it and write to the end of the ‘output flowfile'
>>>        session.read(original, new InputStreamCallback() {
>>>              void process(InputStream in) {
>>>                   processWithSaxParser(in,*out*);
>>>              }
>>>        });
>>> 
>>>     }
>>> });
>>> 
>>> session.transfer(output, REL_SUCCESS);
>>> session.remove(original);
>>> 
>>> 
>>> Thanks
>>> -Mark
>>> 
>>> 
>>>> On Mar 31, 2020, at 2:04 PM, Russell Bateman<r...@windofkeltia.com>  wrote:
>>>> 
>>>> Mark,
>>>> 
>>>> Thanks for getting back. My steps are:
>>>> 
>>>> 1. Read the "first half" of the input stream copying it to the output 
>>>> stream. This is because I need to preserve the exact form of it (spacing, 
>>>> indentation, lines, etc.) without change whatsoever. If I
>>>> 
>>>> 2. Reopen the stream from the beginning with a SAX parser. Its handler, 
>>>> which I wrote, will ignore the original part that I'm holding for 
>>>> sacred--everything between <document> and </document>.
>>>> 
>>>> 3. The SAX handler writes the rest of the XML with a few changes out 
>>>> appending it to that same output stream on which the original "half" was 
>>>> written. (This does not seem to work.)
>>>> 
>>>> I was not seeing this as "overwriting" flowfile content, but, in my tiny 
>>>> little mind, I imagined an input stream, which I want to read exactly a) 
>>>> one-half, then again, b) one-whole time, and an output stream to which I 
>>>> start to write by copying (a), followed by a modification of (b) yet, the 
>>>> whole (b) or "second half." Then I'm done. I was thinking of the input 
>>>> stream as from the in-coming flowfile and a separate thing from the output 
>>>> stream which I see as being offered to me for my use in creating a new 
>>>> flowfile to transfer to. I guess this is not how it works.
>>>> 
>>>> My in-coming flowfiles can be megabytes in size. Copying to a string is 
>>>> not an option. Copying to a temporary file "isn't NiFi" as I understand 
>>>> it. I was hoping to avoid writing another processor or two to a) break up 
>>>> the flowfile into <document> ... </document> and (all the rest), fix (all 
>>>> the rest), then stitch the two back together in a later processor. I see 
>>>> having to coordinate the two halves of what used to be one file fraught 
>>>> with precarity and confusion, but I guess that's the solution I'm left 
>>>> with?
>>>> 
>>>> Thanks,
>>>> Russ
>>>> 
>>>> 
>>>> On 3/31/20 10:23 AM, Mark Payne wrote:
>>>>> Russ,
>>>>> 
>>>>> As far as I can tell, this is working exactly as expected.
>>>>> 
>>>>> To verify, I created a simple Integration test, as well, which I attached 
>>>>> below.
>>>>> 
>>>>> Let me outline what I *think* you’re trying to do here and please correct 
>>>>> me if I’m wrong:
>>>>> 
>>>>> 1. Read the content of the FlowFile. (Via session.read)
>>>>> 2. Overwrite the content of the FlowFile. (This is done by session.write)
>>>>> 3. Overwrite the content of the FlowFile again. (Via session.write)
>>>>> 
>>>>> The third step is the part where I’m confused. You’re calling 
>>>>> session.write() again. In the callback, you’ll receive an InputStream 
>>>>> that contains the contents of the FlowFile (which have now been modified, 
>>>>> per Step 2). You’re also given an OutputStream to write the new content 
>>>>> to.
>>>>> If you then return without writing anything to the OutputStream, as in 
>>>>> the example that you attached, then yes, you’ll have erased all of the 
>>>>> FlowFile’s content.
>>>>> 
>>>>> It’s unclear to me exactly what you’re attempting to accomplish in the 
>>>>> third step. It *sounds* like you’re expecting the content of the 
>>>>> original/incoming FlowFile. But you’re not going to get that because 
>>>>> you’ve already overwritten that FlowFile’s content. If that is what 
>>>>> you’re trying to do, I think what you’d want to do is something more like 
>>>>> this:
>>>>> 
>>>>> FlowFile original = session.get();
>>>>> If (original == null) {
>>>>>   return;
>>>>> }
>>>>> 
>>>>> session.read(original, new InputStreamCallback() {…});
>>>>> 
>>>>> FlowFile childFlowFile = session.create(original); // Create a ‘child’ 
>>>>> flow file whose content is equal to the original FlowFile’s content.
>>>>> session.write(childFlowFile, new StreamCallback() {…});
>>>>> 
>>>>> // Read the original FlowFile’s content
>>>>> session.read(original, new InputStreamCallback() { … });
>>>>> 
>>>>> session.transfer(childFlowFile, REL_SUCCESS);
>>>>> session.remove(original); // or transfer to an ‘original’ relationship or 
>>>>> whatever makes sense for you.
>>>>> 
>>>>> 
>>>>> 
>>>>> Hope this helps!
>>>>> -Mark
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>>> On Mar 30, 2020, at 4:23 PM, Russell Bateman <r...@windofkeltia.com  
>>>>>> <mailto:r...@windofkeltia.com>> wrote:
>>>>>> 
>>>>>> If I haven't worn out my welcome, here is the simplified code that 
>>>>>> should demonstrate either that I have miscoded your suggestions or that 
>>>>>> the API doesn't in fact work as advertised. First, the output. The code, 
>>>>>> both JUnit test and processor are attached and the files are pretty 
>>>>>> small.
>>>>>> 
>>>>>> Much thanks,
>>>>>> Russ
>>>>>> 
>>>>>> This is the input stream first time around (before copying) 
>>>>>> ===================================
>>>>>> * * * session.read( flowfile );
>>>>>>       Here's what's in input stream:
>>>>>> *<cxml>**
>>>>>> **<document>**
>>>>>> **    This is the original document.**
>>>>>> **</document>**
>>>>>> **<metadata>**
>>>>>> **<date_of_service>2016-06-28 13:23</date_of_service>**
>>>>>> **</metadata>**
>>>>>> **<demographics>**
>>>>>> **<date_of_birth>1980-07-01</date_of_birth>**
>>>>>> **<age>36</age>**
>>>>>> **</demographics>**
>>>>>> **</cxml>*
>>>>>> 
>>>>>> And now, let's copy some of the input stream to the output stream 
>>>>>> =============================
>>>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>>>       Copying input stream to output stream up to </document>...
>>>>>>       The output stream has in it at this point:
>>>>>> *<cxml>**
>>>>>> **<document>**
>>>>>> **    This is the original document.**
>>>>>> **</document>**
>>>>>> *
>>>>>> [1. When we examine the output stream, it has what we expect.]
>>>>>> 
>>>>>> After copying, can we reopen input stream intact and does outputstream 
>>>>>> have what we think? ====
>>>>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>>>>       Here's what's in input stream:
>>>>>> *<cxml>**
>>>>>> **<document>**
>>>>>> **    This is the original document.**
>>>>>> **</document>*
>>>>>> 
>>>>>> [2. The input stream as reported just above is truncated by exactly the 
>>>>>> content we did
>>>>>>       not copy to the output stream. We expected to see the entire, 
>>>>>> original file, but the
>>>>>>       second half is gone.]
>>>>>> 
>>>>>>       Here's what's in the output stream at this point:
>>>>>> * (nothing)*
>>>>>> 
>>>>>> [3. The content we copied to the output stream has disappeared. Does it 
>>>>>> disappear simply
>>>>>>     because we looked at it (printed it out here)?]
>>>>>> 
>>>>>> 
>>>>>> On 3/29/20 5:05 AM, Joe Witt wrote:
>>>>>>> Russell
>>>>>>> 
>>>>>>> I recommend writing very simple code that does two successive read/write
>>>>>>> operations on basic data so you can make sure the api work/as expected.
>>>>>>> Then add the xml bits.
>>>>>>> 
>>>>>>> Thanks
>>>>>>> 
>>>>>>> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen<mikerthom...@gmail.com>   
>>>>>>> <mailto:mikerthom...@gmail.com>   wrote:
>>>>>>> 
>>>>>>>> If these files are only a few MB at the most, you can also just export 
>>>>>>>> them
>>>>>>>> to a ByteArrayOutputStream. Just a thought.
>>>>>>>> 
>>>>>>>> On Sun, Mar 29, 2020 at 12:16 AM Russell 
>>>>>>>> Bateman<r...@windofkeltia.com>   <mailto:r...@windofkeltia.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Joe and Mike,
>>>>>>>>> 
>>>>>>>>> Sadly, I was not able to get very far on this. It seems that the 
>>>>>>>>> extend
>>>>>>>>> to which I copy the first half of the contents of the input stream, I
>>>>>>>>> lose what comes after when I try to read again, basically, the second
>>>>>>>>> half comprising the <metadata>and <demographics>elements which I was
>>>>>>>>> hoping to SAX-parse. Here's code and output. I have highlighted the
>>>>>>>>> output to make it easier to read.
>>>>>>>>> 
>>>>>>>>> ? <#>
>>>>>>>>> |try|
>>>>>>>>> |{|
>>>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>>>> |||System.out.println( ||"This is the input stream first time around
>>>>>>>>> (before copying to output stream)..."| |);|
>>>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>>>> |||inputStream.close();|
>>>>>>>>> |}|
>>>>>>>>> |catch||( IOException e )|
>>>>>>>>> |{|
>>>>>>>>> |||e.printStackTrace();|
>>>>>>>>> |}|
>>>>>>>>> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
>>>>>>>>> |{|
>>>>>>>>> |||@Override|
>>>>>>>>> |||public| |void| |process( InputStream inputStream, OutputStream
>>>>>>>>> outputStream ) ||throws| |IOException|
>>>>>>>>> |||{|
>>>>>>>>> |||System.out.println( ||"And now, let's copy..."| |);|
>>>>>>>>> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
>>>>>>>>> outputStream );|
>>>>>>>>> |||}|
>>>>>>>>> |} );|
>>>>>>>>> |try|
>>>>>>>>> |{|
>>>>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>>>>> |||System.out.println( ||"This is the input stream second time around
>>>>>>>>> (after copying)..."| |);|
>>>>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>>>>> |||inputStream.close();|
>>>>>>>>> |}|
>>>>>>>>> |catch||( IOException e )|
>>>>>>>>> |{|
>>>>>>>>> |||e.printStackTrace();|
>>>>>>>>> |}|
>>>>>>>>> |// ...on to SAX parser which dies because the input has been 
>>>>>>>>> truncated
>>>>>>>> to|
>>>>>>>>> |// exactly what was written out to the output stream|
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Output of above:
>>>>>>>>> 
>>>>>>>>> This is the input stream first time around (before copying to output
>>>>>>>>> stream)...
>>>>>>>>> <cxml>
>>>>>>>>>    <document>
>>>>>>>>>      This is the original document.
>>>>>>>>>    </document>
>>>>>>>>>    <metadata>
>>>>>>>>>      <date_of_service>2016-06-28 13:23</date_of_service>
>>>>>>>>>    </metadata>
>>>>>>>>>    <demographics>
>>>>>>>>>      <date_of_birth>1980-07-01</date_of_birth>
>>>>>>>>>      <age>36</age>
>>>>>>>>>    </demographics>
>>>>>>>>> </cxml>
>>>>>>>>> 
>>>>>>>>> And now, let's copy...
>>>>>>>>> This is the input stream second time around (after copying)...
>>>>>>>>> <cxml>
>>>>>>>>>    <document>
>>>>>>>>>      This is the original document.
>>>>>>>>>    </document>
>>>>>>>>> And now, we'll go on to the SAX parser...
>>>>>>>>> <cxml> <document> This is the original document. </document>
>>>>>>>>> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
>>>>>>>>> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
>>>>>>>>> document structures must start and end within the same entity.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I left off the code that prints, "And now, we'll go on to the SAX
>>>>>>>>> parser..." It's in the next flowfile = session.write( ... ). I have 
>>>>>>>>> unit
>>>>>>>>> tests that verify the good functioning of
>>>>>>>>> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
>>>>>>>>> "file" is truncated; SAX finds the first "half" just fine, but there 
>>>>>>>>> is
>>>>>>>>> no second "half". If I comment out copying from input stream to output
>>>>>>>>> stream, the error doesn't occur--the whole document is there.
>>>>>>>>> 
>>>>>>>>> Thanks for looking at this again if you can,
>>>>>>>>> Russ
>>>>>>>>> 
>>>>>>>>> On 3/27/20 3:08 PM, Joe Witt wrote:
>>>>>>>>>> you should be able to call write as many times as you need.  just 
>>>>>>>>>> keep
>>>>>>>>>> using the resulting flowfile reference into the next call.
>>>>>>>>>> 
>>>>>>>>>> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman 
>>>>>>>>>> <r...@windofkeltia.com   <mailto:r...@windofkeltia.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Mike,
>>>>>>>>>>> 
>>>>>>>>>>> Many thanks for responding. Do you mean to say that all I have to do
>>>>>>>> is
>>>>>>>>>>> something like this?
>>>>>>>>>>> 
>>>>>>>>>>>      public void onTrigger( final ProcessContext context, final
>>>>>>>>>>>      ProcessSession session ) throws ProcessException
>>>>>>>>>>>      {
>>>>>>>>>>>         FlowFile flowfile = session.get();
>>>>>>>>>>>         ...
>>>>>>>>>>> 
>>>>>>>>>>>         // this is will be our resulting flowfile...
>>>>>>>>>>>         AtomicReference< OutputStream > savedOutputStream = new
>>>>>>>>>>>      AtomicReference<>();
>>>>>>>>>>> 
>>>>>>>>>>>         /* Do some processing on the in-coming flowfile then close 
>>>>>>>>>>> its
>>>>>>>>>>>      input stream, but
>>>>>>>>>>>          * save the output stream for continued use.
>>>>>>>>>>>          */
>>>>>>>>>>>      *  session.write( flowfile, new InputStreamCallback()*
>>>>>>>>>>>         {
>>>>>>>>>>>           @Override
>>>>>>>>>>>      *    public void process( InputStream inputStream, OutputStream
>>>>>>>>>>>      outputStream ) throws IOException*
>>>>>>>>>>>           {
>>>>>>>>>>>             savedOutputStream.set( outputStream );
>>>>>>>>>>>             ...
>>>>>>>>>>> 
>>>>>>>>>>>             // processing puts some output on the output stream...
>>>>>>>>>>>             outputStream.write( etc. );
>>>>>>>>>>> 
>>>>>>>>>>>             inputStream.close();
>>>>>>>>>>>           }
>>>>>>>>>>>      *  } );*
>>>>>>>>>>> 
>>>>>>>>>>>         /* Start over doing different processing on the
>>>>>>>> (same/reopened)
>>>>>>>>>>>      in-coming flowfile
>>>>>>>>>>>          * continuing to use the original output stream. It's our
>>>>>>>>>>>      responsibility to close
>>>>>>>>>>>          * the saved output stream, NiFi closes the unused output
>>>>>>>> stream
>>>>>>>>>>>      opened, but
>>>>>>>>>>>          * ignored by us.
>>>>>>>>>>>          */
>>>>>>>>>>>      *  session.write( flowfile, new StreamCallback()*
>>>>>>>>>>>         {
>>>>>>>>>>>           @Override
>>>>>>>>>>>      *    public void process( InputStream inputStream, OutputStream
>>>>>>>>>>>      outputStream ) throws IOException*
>>>>>>>>>>>           {
>>>>>>>>>>>             outputStream = savedOutputStream.get(); // (discard the
>>>>>>>> new
>>>>>>>>>>>      output stream)
>>>>>>>>>>>             ...
>>>>>>>>>>> 
>>>>>>>>>>>             // processing puts (some more) output on the original
>>>>>>>> output
>>>>>>>>>>>      stream...
>>>>>>>>>>>             outputStream.write( etc. );
>>>>>>>>>>> 
>>>>>>>>>>>             outputStream.close();
>>>>>>>>>>>           }
>>>>>>>>>>>      *  } );*
>>>>>>>>>>> 
>>>>>>>>>>>         session.transfer( flowfile, etc. );
>>>>>>>>>>>      }
>>>>>>>>>>> 
>>>>>>>>>>> I'm wondering if this will work to "discard" the new output stream
>>>>>>>>>>> opened for me (the second time) and replace it with the original one
>>>>>>>>>>> which was probably closed when the first call to
>>>>>>>>>>> session.write()finished. What's on these streams is way too big for 
>>>>>>>>>>> me
>>>>>>>>>>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>>>>>>>>>> 
>>>>>>>>>>> Russ
>>>>>>>>>>> 
>>>>>>>>>>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>>>>>>>>>>> session.read(FlowFile) just gives you an InputStream. You should be
>>>>>>>>> able
>>>>>>>>>>> to
>>>>>>>>>>>> rerun that as many times as you want provided you properly close 
>>>>>>>>>>>> it.
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>>>>>>>>> r...@windofkeltia.com   <mailto:r...@windofkeltia.com>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> In my custom processor, I'm using a SAX parser to process an
>>>>>>>> incoming
>>>>>>>>>>>>> flowfile that's in XML. Except that, this particular XML is in
>>>>>>>> essence
>>>>>>>>>>>>> two different files and I would like to split, read and process 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> first "half", which starts a couple of lines (XML elements) into 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> file) not using the SAX parser. At the end, I would stream the
>>>>>>>> output
>>>>>>>>> of
>>>>>>>>>>>>> the first half, then the SAX-processed second half.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> So, in short:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>    1. process the incoming flowfile for the early content not 
>>>>>>>>>>>>> using
>>>>>>>>> SAX,
>>>>>>>>>>>>>       but merely copying as-is; at all cost I must avoid
>>>>>>>>> "reassembling"
>>>>>>>>>>>>>       the first half using my SAX handler (what I'm doing now),
>>>>>>>>>>>>>    2. output the first part down the output stream to the 
>>>>>>>>>>>>> resulting
>>>>>>>>>>> flowfile,
>>>>>>>>>>>>>    3. (re)process the incoming flowfile using SAX (and I can just
>>>>>>>> skip
>>>>>>>>>>>>>       over the first bit) and spitting the result of this second
>>>>>>>> part
>>>>>>>>> out
>>>>>>>>>>>>>       down the output stream of the resulting flowfile.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I guess this is tantamount to asking how, in Java, I can read an
>>>>>>>> input
>>>>>>>>>>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>>>>>>>>>>> developer question and more a Java question. I have looked at it
>>>>>>>> that
>>>>>>>>>>>>> way too, but, if one of you knows (particularly NiFi) best
>>>>>>>> practice, I
>>>>>>>>>>>>> would very much like to hear about it.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>> <ReadSplitWrite.java><ReadSplitWriteTest.java>
>> 
> 

Reply via email to