Russ,

OK, so then I think the pattern you’d want to follow would be something like 
this:

FlowFile original = session.get();
if (flowFile == null) {
    return;
}

FlowFile output = session.create(original);

// Begin writing to ‘output flowfile'
output = session.write(modified, new OutputStreamCallback() {
    void process(OutputStream out) {

        // read from original FlowFile
        session.read(original, new InputStreamCallback() {
          void process(InputStream in) {
               copyFirstHalf(in, out);
          }
       });


        // read from original FlowFile a second time. Use a SAX parser to parse 
it and write to the end of the ‘output flowfile'
       session.read(original, new InputStreamCallback() {
             void process(InputStream in) {
                  processWithSaxParser(in, out);
             }
       });

    }
});

session.transfer(output, REL_SUCCESS);
session.remove(original);


Thanks
-Mark


> On Mar 31, 2020, at 2:04 PM, Russell Bateman <[email protected]> wrote:
> 
> Mark,
> 
> Thanks for getting back. My steps are:
> 
> 1. Read the "first half" of the input stream copying it to the output stream. 
> This is because I need to preserve the exact form of it (spacing, 
> indentation, lines, etc.) without change whatsoever. If I
> 
> 2. Reopen the stream from the beginning with a SAX parser. Its handler, which 
> I wrote, will ignore the original part that I'm holding for 
> sacred--everything between <document> and </document>.
> 
> 3. The SAX handler writes the rest of the XML with a few changes out 
> appending it to that same output stream on which the original "half" was 
> written. (This does not seem to work.)
> 
> I was not seeing this as "overwriting" flowfile content, but, in my tiny 
> little mind, I imagined an input stream, which I want to read exactly a) 
> one-half, then again, b) one-whole time, and an output stream to which I 
> start to write by copying (a), followed by a modification of (b) yet, the 
> whole (b) or "second half." Then I'm done. I was thinking of the input stream 
> as from the in-coming flowfile and a separate thing from the output stream 
> which I see as being offered to me for my use in creating a new flowfile to 
> transfer to. I guess this is not how it works.
> 
> My in-coming flowfiles can be megabytes in size. Copying to a string is not 
> an option. Copying to a temporary file "isn't NiFi" as I understand it. I was 
> hoping to avoid writing another processor or two to a) break up the flowfile 
> into <document> ... </document> and (all the rest), fix (all the rest), then 
> stitch the two back together in a later processor. I see having to coordinate 
> the two halves of what used to be one file fraught with precarity and 
> confusion, but I guess that's the solution I'm left with?
> 
> Thanks,
> Russ
> 
> 
> On 3/31/20 10:23 AM, Mark Payne wrote:
>> Russ,
>> 
>> As far as I can tell, this is working exactly as expected.
>> 
>> To verify, I created a simple Integration test, as well, which I attached 
>> below.
>> 
>> Let me outline what I *think* you’re trying to do here and please correct me 
>> if I’m wrong:
>> 
>> 1. Read the content of the FlowFile. (Via session.read)
>> 2. Overwrite the content of the FlowFile. (This is done by session.write)
>> 3. Overwrite the content of the FlowFile again. (Via session.write)
>> 
>> The third step is the part where I’m confused. You’re calling 
>> session.write() again. In the callback, you’ll receive an InputStream that 
>> contains the contents of the FlowFile (which have now been modified, per 
>> Step 2). You’re also given an OutputStream to write the new content to.
>> If you then return without writing anything to the OutputStream, as in the 
>> example that you attached, then yes, you’ll have erased all of the 
>> FlowFile’s content.
>> 
>> It’s unclear to me exactly what you’re attempting to accomplish in the third 
>> step. It *sounds* like you’re expecting the content of the original/incoming 
>> FlowFile. But you’re not going to get that because you’ve already 
>> overwritten that FlowFile’s content. If that is what you’re trying to do, I 
>> think what you’d want to do is something more like this:
>> 
>> FlowFile original = session.get();
>> If (original == null) {
>>   return;
>> }
>> 
>> session.read(original, new InputStreamCallback() {…});
>> 
>> FlowFile childFlowFile = session.create(original); // Create a ‘child’ flow 
>> file whose content is equal to the original FlowFile’s content.
>> session.write(childFlowFile, new StreamCallback() {…});
>> 
>> // Read the original FlowFile’s content
>> session.read(original, new InputStreamCallback() { … });
>> 
>> session.transfer(childFlowFile, REL_SUCCESS);
>> session.remove(original); // or transfer to an ‘original’ relationship or 
>> whatever makes sense for you.
>> 
>> 
>> 
>> Hope this helps!
>> -Mark
>> 
>> 
>> 
>> 
>>> On Mar 30, 2020, at 4:23 PM, Russell Bateman <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> If I haven't worn out my welcome, here is the simplified code that should 
>>> demonstrate either that I have miscoded your suggestions or that the API 
>>> doesn't in fact work as advertised. First, the output. The code, both JUnit 
>>> test and processor are attached and the files are pretty small.
>>> 
>>> Much thanks,
>>> Russ
>>> 
>>> This is the input stream first time around (before copying) 
>>> ===================================
>>> * * * session.read( flowfile );
>>>       Here's what's in input stream:
>>> *<cxml>**
>>> **<document>**
>>> **    This is the original document.**
>>> **</document>**
>>> **<metadata>**
>>> **<date_of_service>2016-06-28 13:23</date_of_service>**
>>> **</metadata>**
>>> **<demographics>**
>>> **<date_of_birth>1980-07-01</date_of_birth>**
>>> **<age>36</age>**
>>> **</demographics>**
>>> **</cxml>*
>>> 
>>> And now, let's copy some of the input stream to the output stream 
>>> =============================
>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>       Copying input stream to output stream up to </document>...
>>>       The output stream has in it at this point:
>>> *<cxml>**
>>> **<document>**
>>> **    This is the original document.**
>>> **</document>**
>>> *
>>> [1. When we examine the output stream, it has what we expect.]
>>> 
>>> After copying, can we reopen input stream intact and does outputstream have 
>>> what we think? ====
>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>>       Here's what's in input stream:
>>> *<cxml>**
>>> **<document>**
>>> **    This is the original document.**
>>> **</document>*
>>> 
>>> [2. The input stream as reported just above is truncated by exactly the 
>>> content we did
>>>       not copy to the output stream. We expected to see the entire, 
>>> original file, but the
>>>       second half is gone.]
>>> 
>>>       Here's what's in the output stream at this point:
>>> * (nothing)*
>>> 
>>> [3. The content we copied to the output stream has disappeared. Does it 
>>> disappear simply
>>>     because we looked at it (printed it out here)?]
>>> 
>>> 
>>> On 3/29/20 5:05 AM, Joe Witt wrote:
>>>> Russell
>>>> 
>>>> I recommend writing very simple code that does two successive read/write
>>>> operations on basic data so you can make sure the api work/as expected.
>>>> Then add the xml bits.
>>>> 
>>>> Thanks
>>>> 
>>>> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen<[email protected]>  
>>>> <mailto:[email protected]>  wrote:
>>>> 
>>>>> If these files are only a few MB at the most, you can also just export 
>>>>> them
>>>>> to a ByteArrayOutputStream. Just a thought.
>>>>> 
>>>>> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman<[email protected]>  
>>>>> <mailto:[email protected]>
>>>>> wrote:
>>>>> 
>>>>>> Joe and Mike,
>>>>>> 
>>>>>> Sadly, I was not able to get very far on this. It seems that the extend
>>>>>> to which I copy the first half of the contents of the input stream, I
>>>>>> lose what comes after when I try to read again, basically, the second
>>>>>> half comprising the <metadata>and <demographics>elements which I was
>>>>>> hoping to SAX-parse. Here's code and output. I have highlighted the
>>>>>> output to make it easier to read.
>>>>>> 
>>>>>> ? <#>
>>>>>> |try|
>>>>>> |{|
>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>> |||System.out.println( ||"This is the input stream first time around
>>>>>> (before copying to output stream)..."| |);|
>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>> |||inputStream.close();|
>>>>>> |}|
>>>>>> |catch||( IOException e )|
>>>>>> |{|
>>>>>> |||e.printStackTrace();|
>>>>>> |}|
>>>>>> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
>>>>>> |{|
>>>>>> |||@Override|
>>>>>> |||public| |void| |process( InputStream inputStream, OutputStream
>>>>>> outputStream ) ||throws| |IOException|
>>>>>> |||{|
>>>>>> |||System.out.println( ||"And now, let's copy..."| |);|
>>>>>> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
>>>>>> outputStream );|
>>>>>> |||}|
>>>>>> |} );|
>>>>>> |try|
>>>>>> |{|
>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>> |||System.out.println( ||"This is the input stream second time around
>>>>>> (after copying)..."| |);|
>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>> |||inputStream.close();|
>>>>>> |}|
>>>>>> |catch||( IOException e )|
>>>>>> |{|
>>>>>> |||e.printStackTrace();|
>>>>>> |}|
>>>>>> |// ...on to SAX parser which dies because the input has been truncated
>>>>> to|
>>>>>> |// exactly what was written out to the output stream|
>>>>>> 
>>>>>> 
>>>>>> Output of above:
>>>>>> 
>>>>>> This is the input stream first time around (before copying to output
>>>>>> stream)...
>>>>>> <cxml>
>>>>>>    <document>
>>>>>>      This is the original document.
>>>>>>    </document>
>>>>>>    <metadata>
>>>>>>      <date_of_service>2016-06-28 13:23</date_of_service>
>>>>>>    </metadata>
>>>>>>    <demographics>
>>>>>>      <date_of_birth>1980-07-01</date_of_birth>
>>>>>>      <age>36</age>
>>>>>>    </demographics>
>>>>>> </cxml>
>>>>>> 
>>>>>> And now, let's copy...
>>>>>> This is the input stream second time around (after copying)...
>>>>>> <cxml>
>>>>>>    <document>
>>>>>>      This is the original document.
>>>>>>    </document>
>>>>>> And now, we'll go on to the SAX parser...
>>>>>> <cxml> <document> This is the original document. </document>
>>>>>> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
>>>>>> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
>>>>>> document structures must start and end within the same entity.
>>>>>> 
>>>>>> 
>>>>>> I left off the code that prints, "And now, we'll go on to the SAX
>>>>>> parser..." It's in the next flowfile = session.write( ... ). I have unit
>>>>>> tests that verify the good functioning of
>>>>>> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
>>>>>> "file" is truncated; SAX finds the first "half" just fine, but there is
>>>>>> no second "half". If I comment out copying from input stream to output
>>>>>> stream, the error doesn't occur--the whole document is there.
>>>>>> 
>>>>>> Thanks for looking at this again if you can,
>>>>>> Russ
>>>>>> 
>>>>>> On 3/27/20 3:08 PM, Joe Witt wrote:
>>>>>>> you should be able to call write as many times as you need.  just keep
>>>>>>> using the resulting flowfile reference into the next call.
>>>>>>> 
>>>>>>> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <[email protected]  
>>>>>>> <mailto:[email protected]>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Mike,
>>>>>>>> 
>>>>>>>> Many thanks for responding. Do you mean to say that all I have to do
>>>>> is
>>>>>>>> something like this?
>>>>>>>> 
>>>>>>>>      public void onTrigger( final ProcessContext context, final
>>>>>>>>      ProcessSession session ) throws ProcessException
>>>>>>>>      {
>>>>>>>>         FlowFile flowfile = session.get();
>>>>>>>>         ...
>>>>>>>> 
>>>>>>>>         // this is will be our resulting flowfile...
>>>>>>>>         AtomicReference< OutputStream > savedOutputStream = new
>>>>>>>>      AtomicReference<>();
>>>>>>>> 
>>>>>>>>         /* Do some processing on the in-coming flowfile then close its
>>>>>>>>      input stream, but
>>>>>>>>          * save the output stream for continued use.
>>>>>>>>          */
>>>>>>>>      *  session.write( flowfile, new InputStreamCallback()*
>>>>>>>>         {
>>>>>>>>           @Override
>>>>>>>>      *    public void process( InputStream inputStream, OutputStream
>>>>>>>>      outputStream ) throws IOException*
>>>>>>>>           {
>>>>>>>>             savedOutputStream.set( outputStream );
>>>>>>>>             ...
>>>>>>>> 
>>>>>>>>             // processing puts some output on the output stream...
>>>>>>>>             outputStream.write( etc. );
>>>>>>>> 
>>>>>>>>             inputStream.close();
>>>>>>>>           }
>>>>>>>>      *  } );*
>>>>>>>> 
>>>>>>>>         /* Start over doing different processing on the
>>>>> (same/reopened)
>>>>>>>>      in-coming flowfile
>>>>>>>>          * continuing to use the original output stream. It's our
>>>>>>>>      responsibility to close
>>>>>>>>          * the saved output stream, NiFi closes the unused output
>>>>> stream
>>>>>>>>      opened, but
>>>>>>>>          * ignored by us.
>>>>>>>>          */
>>>>>>>>      *  session.write( flowfile, new StreamCallback()*
>>>>>>>>         {
>>>>>>>>           @Override
>>>>>>>>      *    public void process( InputStream inputStream, OutputStream
>>>>>>>>      outputStream ) throws IOException*
>>>>>>>>           {
>>>>>>>>             outputStream = savedOutputStream.get(); // (discard the
>>>>> new
>>>>>>>>      output stream)
>>>>>>>>             ...
>>>>>>>> 
>>>>>>>>             // processing puts (some more) output on the original
>>>>> output
>>>>>>>>      stream...
>>>>>>>>             outputStream.write( etc. );
>>>>>>>> 
>>>>>>>>             outputStream.close();
>>>>>>>>           }
>>>>>>>>      *  } );*
>>>>>>>> 
>>>>>>>>         session.transfer( flowfile, etc. );
>>>>>>>>      }
>>>>>>>> 
>>>>>>>> I'm wondering if this will work to "discard" the new output stream
>>>>>>>> opened for me (the second time) and replace it with the original one
>>>>>>>> which was probably closed when the first call to
>>>>>>>> session.write()finished. What's on these streams is way too big for me
>>>>>>>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>>>>>>> 
>>>>>>>> Russ
>>>>>>>> 
>>>>>>>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>>>>>>>> session.read(FlowFile) just gives you an InputStream. You should be
>>>>>> able
>>>>>>>> to
>>>>>>>>> rerun that as many times as you want provided you properly close it.
>>>>>>>>> 
>>>>>>>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>>>>>> [email protected]  <mailto:[email protected]>>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> In my custom processor, I'm using a SAX parser to process an
>>>>> incoming
>>>>>>>>>> flowfile that's in XML. Except that, this particular XML is in
>>>>> essence
>>>>>>>>>> two different files and I would like to split, read and process the
>>>>>>>>>> first "half", which starts a couple of lines (XML elements) into the
>>>>>>>>>> file) not using the SAX parser. At the end, I would stream the
>>>>> output
>>>>>> of
>>>>>>>>>> the first half, then the SAX-processed second half.
>>>>>>>>>> 
>>>>>>>>>> So, in short:
>>>>>>>>>> 
>>>>>>>>>>    1. process the incoming flowfile for the early content not using
>>>>>> SAX,
>>>>>>>>>>       but merely copying as-is; at all cost I must avoid
>>>>>> "reassembling"
>>>>>>>>>>       the first half using my SAX handler (what I'm doing now),
>>>>>>>>>>    2. output the first part down the output stream to the resulting
>>>>>>>> flowfile,
>>>>>>>>>>    3. (re)process the incoming flowfile using SAX (and I can just
>>>>> skip
>>>>>>>>>>       over the first bit) and spitting the result of this second
>>>>> part
>>>>>> out
>>>>>>>>>>       down the output stream of the resulting flowfile.
>>>>>>>>>> 
>>>>>>>>>> I guess this is tantamount to asking how, in Java, I can read an
>>>>> input
>>>>>>>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>>>>>>>> developer question and more a Java question. I have looked at it
>>>>> that
>>>>>>>>>> way too, but, if one of you knows (particularly NiFi) best
>>>>> practice, I
>>>>>>>>>> would very much like to hear about it.
>>>>>>>>>> 
>>>>>>>>>> Thanks.
>>>>>>>>>> 
>>>>>>>>>> 
>>> 
>>> <ReadSplitWrite.java><ReadSplitWriteTest.java>
>> 
> 

Reply via email to