Russ,
OK, so then I think the pattern you’d want to follow would be something like
this:
FlowFile original = session.get();
if (flowFile == null) {
return;
}
FlowFile output = session.create(original);
// Begin writing to ‘output flowfile'
output = session.write(modified, new OutputStreamCallback() {
void process(OutputStream out) {
// read from original FlowFile
session.read(original, new InputStreamCallback() {
void process(InputStream in) {
copyFirstHalf(in, out);
}
});
// read from original FlowFile a second time. Use a SAX parser to parse
it and write to the end of the ‘output flowfile'
session.read(original, new InputStreamCallback() {
void process(InputStream in) {
processWithSaxParser(in, out);
}
});
}
});
session.transfer(output, REL_SUCCESS);
session.remove(original);
Thanks
-Mark
> On Mar 31, 2020, at 2:04 PM, Russell Bateman <[email protected]> wrote:
>
> Mark,
>
> Thanks for getting back. My steps are:
>
> 1. Read the "first half" of the input stream copying it to the output stream.
> This is because I need to preserve the exact form of it (spacing,
> indentation, lines, etc.) without change whatsoever. If I
>
> 2. Reopen the stream from the beginning with a SAX parser. Its handler, which
> I wrote, will ignore the original part that I'm holding for
> sacred--everything between <document> and </document>.
>
> 3. The SAX handler writes the rest of the XML with a few changes out
> appending it to that same output stream on which the original "half" was
> written. (This does not seem to work.)
>
> I was not seeing this as "overwriting" flowfile content, but, in my tiny
> little mind, I imagined an input stream, which I want to read exactly a)
> one-half, then again, b) one-whole time, and an output stream to which I
> start to write by copying (a), followed by a modification of (b) yet, the
> whole (b) or "second half." Then I'm done. I was thinking of the input stream
> as from the in-coming flowfile and a separate thing from the output stream
> which I see as being offered to me for my use in creating a new flowfile to
> transfer to. I guess this is not how it works.
>
> My in-coming flowfiles can be megabytes in size. Copying to a string is not
> an option. Copying to a temporary file "isn't NiFi" as I understand it. I was
> hoping to avoid writing another processor or two to a) break up the flowfile
> into <document> ... </document> and (all the rest), fix (all the rest), then
> stitch the two back together in a later processor. I see having to coordinate
> the two halves of what used to be one file fraught with precarity and
> confusion, but I guess that's the solution I'm left with?
>
> Thanks,
> Russ
>
>
> On 3/31/20 10:23 AM, Mark Payne wrote:
>> Russ,
>>
>> As far as I can tell, this is working exactly as expected.
>>
>> To verify, I created a simple Integration test, as well, which I attached
>> below.
>>
>> Let me outline what I *think* you’re trying to do here and please correct me
>> if I’m wrong:
>>
>> 1. Read the content of the FlowFile. (Via session.read)
>> 2. Overwrite the content of the FlowFile. (This is done by session.write)
>> 3. Overwrite the content of the FlowFile again. (Via session.write)
>>
>> The third step is the part where I’m confused. You’re calling
>> session.write() again. In the callback, you’ll receive an InputStream that
>> contains the contents of the FlowFile (which have now been modified, per
>> Step 2). You’re also given an OutputStream to write the new content to.
>> If you then return without writing anything to the OutputStream, as in the
>> example that you attached, then yes, you’ll have erased all of the
>> FlowFile’s content.
>>
>> It’s unclear to me exactly what you’re attempting to accomplish in the third
>> step. It *sounds* like you’re expecting the content of the original/incoming
>> FlowFile. But you’re not going to get that because you’ve already
>> overwritten that FlowFile’s content. If that is what you’re trying to do, I
>> think what you’d want to do is something more like this:
>>
>> FlowFile original = session.get();
>> If (original == null) {
>> return;
>> }
>>
>> session.read(original, new InputStreamCallback() {…});
>>
>> FlowFile childFlowFile = session.create(original); // Create a ‘child’ flow
>> file whose content is equal to the original FlowFile’s content.
>> session.write(childFlowFile, new StreamCallback() {…});
>>
>> // Read the original FlowFile’s content
>> session.read(original, new InputStreamCallback() { … });
>>
>> session.transfer(childFlowFile, REL_SUCCESS);
>> session.remove(original); // or transfer to an ‘original’ relationship or
>> whatever makes sense for you.
>>
>>
>>
>> Hope this helps!
>> -Mark
>>
>>
>>
>>
>>> On Mar 30, 2020, at 4:23 PM, Russell Bateman <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> If I haven't worn out my welcome, here is the simplified code that should
>>> demonstrate either that I have miscoded your suggestions or that the API
>>> doesn't in fact work as advertised. First, the output. The code, both JUnit
>>> test and processor are attached and the files are pretty small.
>>>
>>> Much thanks,
>>> Russ
>>>
>>> This is the input stream first time around (before copying)
>>> ===================================
>>> * * * session.read( flowfile );
>>> Here's what's in input stream:
>>> *<cxml>**
>>> **<document>**
>>> ** This is the original document.**
>>> **</document>**
>>> **<metadata>**
>>> **<date_of_service>2016-06-28 13:23</date_of_service>**
>>> **</metadata>**
>>> **<demographics>**
>>> **<date_of_birth>1980-07-01</date_of_birth>**
>>> **<age>36</age>**
>>> **</demographics>**
>>> **</cxml>*
>>>
>>> And now, let's copy some of the input stream to the output stream
>>> =============================
>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>> Copying input stream to output stream up to </document>...
>>> The output stream has in it at this point:
>>> *<cxml>**
>>> **<document>**
>>> ** This is the original document.**
>>> **</document>**
>>> *
>>> [1. When we examine the output stream, it has what we expect.]
>>>
>>> After copying, can we reopen input stream intact and does outputstream have
>>> what we think? ====
>>> * * * flowfile = session.write( flowfile, new StreamCallback() ...
>>> Here's what's in input stream:
>>> *<cxml>**
>>> **<document>**
>>> ** This is the original document.**
>>> **</document>*
>>>
>>> [2. The input stream as reported just above is truncated by exactly the
>>> content we did
>>> not copy to the output stream. We expected to see the entire,
>>> original file, but the
>>> second half is gone.]
>>>
>>> Here's what's in the output stream at this point:
>>> * (nothing)*
>>>
>>> [3. The content we copied to the output stream has disappeared. Does it
>>> disappear simply
>>> because we looked at it (printed it out here)?]
>>>
>>>
>>> On 3/29/20 5:05 AM, Joe Witt wrote:
>>>> Russell
>>>>
>>>> I recommend writing very simple code that does two successive read/write
>>>> operations on basic data so you can make sure the api work/as expected.
>>>> Then add the xml bits.
>>>>
>>>> Thanks
>>>>
>>>> On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen<[email protected]>
>>>> <mailto:[email protected]> wrote:
>>>>
>>>>> If these files are only a few MB at the most, you can also just export
>>>>> them
>>>>> to a ByteArrayOutputStream. Just a thought.
>>>>>
>>>>> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman<[email protected]>
>>>>> <mailto:[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Joe and Mike,
>>>>>>
>>>>>> Sadly, I was not able to get very far on this. It seems that the extend
>>>>>> to which I copy the first half of the contents of the input stream, I
>>>>>> lose what comes after when I try to read again, basically, the second
>>>>>> half comprising the <metadata>and <demographics>elements which I was
>>>>>> hoping to SAX-parse. Here's code and output. I have highlighted the
>>>>>> output to make it easier to read.
>>>>>>
>>>>>> ? <#>
>>>>>> |try|
>>>>>> |{|
>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>> |||System.out.println( ||"This is the input stream first time around
>>>>>> (before copying to output stream)..."| |);|
>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>> |||inputStream.close();|
>>>>>> |}|
>>>>>> |catch||( IOException e )|
>>>>>> |{|
>>>>>> |||e.printStackTrace();|
>>>>>> |}|
>>>>>> |flowfile = session.write( flowfile, ||new| |StreamCallback()|
>>>>>> |{|
>>>>>> |||@Override|
>>>>>> |||public| |void| |process( InputStream inputStream, OutputStream
>>>>>> outputStream ) ||throws| |IOException|
>>>>>> |||{|
>>>>>> |||System.out.println( ||"And now, let's copy..."| |);|
>>>>>> |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
>>>>>> outputStream );|
>>>>>> |||}|
>>>>>> |} );|
>>>>>> |try|
>>>>>> |{|
>>>>>> |||InputStream inputStream = session.read( flowfile );|
>>>>>> |||System.out.println( ||"This is the input stream second time around
>>>>>> (after copying)..."| |);|
>>>>>> |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
>>>>>> |||inputStream.close();|
>>>>>> |}|
>>>>>> |catch||( IOException e )|
>>>>>> |{|
>>>>>> |||e.printStackTrace();|
>>>>>> |}|
>>>>>> |// ...on to SAX parser which dies because the input has been truncated
>>>>> to|
>>>>>> |// exactly what was written out to the output stream|
>>>>>>
>>>>>>
>>>>>> Output of above:
>>>>>>
>>>>>> This is the input stream first time around (before copying to output
>>>>>> stream)...
>>>>>> <cxml>
>>>>>> <document>
>>>>>> This is the original document.
>>>>>> </document>
>>>>>> <metadata>
>>>>>> <date_of_service>2016-06-28 13:23</date_of_service>
>>>>>> </metadata>
>>>>>> <demographics>
>>>>>> <date_of_birth>1980-07-01</date_of_birth>
>>>>>> <age>36</age>
>>>>>> </demographics>
>>>>>> </cxml>
>>>>>>
>>>>>> And now, let's copy...
>>>>>> This is the input stream second time around (after copying)...
>>>>>> <cxml>
>>>>>> <document>
>>>>>> This is the original document.
>>>>>> </document>
>>>>>> And now, we'll go on to the SAX parser...
>>>>>> <cxml> <document> This is the original document. </document>
>>>>>> [pool-1-thread-1] ERROR [...] SAX ruleparser error:
>>>>>> org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
>>>>>> document structures must start and end within the same entity.
>>>>>>
>>>>>>
>>>>>> I left off the code that prints, "And now, we'll go on to the SAX
>>>>>> parser..." It's in the next flowfile = session.write( ... ). I have unit
>>>>>> tests that verify the good functioning of
>>>>>> copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
>>>>>> "file" is truncated; SAX finds the first "half" just fine, but there is
>>>>>> no second "half". If I comment out copying from input stream to output
>>>>>> stream, the error doesn't occur--the whole document is there.
>>>>>>
>>>>>> Thanks for looking at this again if you can,
>>>>>> Russ
>>>>>>
>>>>>> On 3/27/20 3:08 PM, Joe Witt wrote:
>>>>>>> you should be able to call write as many times as you need. just keep
>>>>>>> using the resulting flowfile reference into the next call.
>>>>>>>
>>>>>>> On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <[email protected]
>>>>>>> <mailto:[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Mike,
>>>>>>>>
>>>>>>>> Many thanks for responding. Do you mean to say that all I have to do
>>>>> is
>>>>>>>> something like this?
>>>>>>>>
>>>>>>>> public void onTrigger( final ProcessContext context, final
>>>>>>>> ProcessSession session ) throws ProcessException
>>>>>>>> {
>>>>>>>> FlowFile flowfile = session.get();
>>>>>>>> ...
>>>>>>>>
>>>>>>>> // this is will be our resulting flowfile...
>>>>>>>> AtomicReference< OutputStream > savedOutputStream = new
>>>>>>>> AtomicReference<>();
>>>>>>>>
>>>>>>>> /* Do some processing on the in-coming flowfile then close its
>>>>>>>> input stream, but
>>>>>>>> * save the output stream for continued use.
>>>>>>>> */
>>>>>>>> * session.write( flowfile, new InputStreamCallback()*
>>>>>>>> {
>>>>>>>> @Override
>>>>>>>> * public void process( InputStream inputStream, OutputStream
>>>>>>>> outputStream ) throws IOException*
>>>>>>>> {
>>>>>>>> savedOutputStream.set( outputStream );
>>>>>>>> ...
>>>>>>>>
>>>>>>>> // processing puts some output on the output stream...
>>>>>>>> outputStream.write( etc. );
>>>>>>>>
>>>>>>>> inputStream.close();
>>>>>>>> }
>>>>>>>> * } );*
>>>>>>>>
>>>>>>>> /* Start over doing different processing on the
>>>>> (same/reopened)
>>>>>>>> in-coming flowfile
>>>>>>>> * continuing to use the original output stream. It's our
>>>>>>>> responsibility to close
>>>>>>>> * the saved output stream, NiFi closes the unused output
>>>>> stream
>>>>>>>> opened, but
>>>>>>>> * ignored by us.
>>>>>>>> */
>>>>>>>> * session.write( flowfile, new StreamCallback()*
>>>>>>>> {
>>>>>>>> @Override
>>>>>>>> * public void process( InputStream inputStream, OutputStream
>>>>>>>> outputStream ) throws IOException*
>>>>>>>> {
>>>>>>>> outputStream = savedOutputStream.get(); // (discard the
>>>>> new
>>>>>>>> output stream)
>>>>>>>> ...
>>>>>>>>
>>>>>>>> // processing puts (some more) output on the original
>>>>> output
>>>>>>>> stream...
>>>>>>>> outputStream.write( etc. );
>>>>>>>>
>>>>>>>> outputStream.close();
>>>>>>>> }
>>>>>>>> * } );*
>>>>>>>>
>>>>>>>> session.transfer( flowfile, etc. );
>>>>>>>> }
>>>>>>>>
>>>>>>>> I'm wondering if this will work to "discard" the new output stream
>>>>>>>> opened for me (the second time) and replace it with the original one
>>>>>>>> which was probably closed when the first call to
>>>>>>>> session.write()finished. What's on these streams is way too big for me
>>>>>>>> to put them into temporary memory, say, a ByteArrayOutputStream.
>>>>>>>>
>>>>>>>> Russ
>>>>>>>>
>>>>>>>> On 3/27/20 10:03 AM, Mike Thomsen wrote:
>>>>>>>>> session.read(FlowFile) just gives you an InputStream. You should be
>>>>>> able
>>>>>>>> to
>>>>>>>>> rerun that as many times as you want provided you properly close it.
>>>>>>>>>
>>>>>>>>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
>>>>>> [email protected] <mailto:[email protected]>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> In my custom processor, I'm using a SAX parser to process an
>>>>> incoming
>>>>>>>>>> flowfile that's in XML. Except that, this particular XML is in
>>>>> essence
>>>>>>>>>> two different files and I would like to split, read and process the
>>>>>>>>>> first "half", which starts a couple of lines (XML elements) into the
>>>>>>>>>> file) not using the SAX parser. At the end, I would stream the
>>>>> output
>>>>>> of
>>>>>>>>>> the first half, then the SAX-processed second half.
>>>>>>>>>>
>>>>>>>>>> So, in short:
>>>>>>>>>>
>>>>>>>>>> 1. process the incoming flowfile for the early content not using
>>>>>> SAX,
>>>>>>>>>> but merely copying as-is; at all cost I must avoid
>>>>>> "reassembling"
>>>>>>>>>> the first half using my SAX handler (what I'm doing now),
>>>>>>>>>> 2. output the first part down the output stream to the resulting
>>>>>>>> flowfile,
>>>>>>>>>> 3. (re)process the incoming flowfile using SAX (and I can just
>>>>> skip
>>>>>>>>>> over the first bit) and spitting the result of this second
>>>>> part
>>>>>> out
>>>>>>>>>> down the output stream of the resulting flowfile.
>>>>>>>>>>
>>>>>>>>>> I guess this is tantamount to asking how, in Java, I can read an
>>>>> input
>>>>>>>>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
>>>>>>>>>> developer question and more a Java question. I have looked at it
>>>>> that
>>>>>>>>>> way too, but, if one of you knows (particularly NiFi) best
>>>>> practice, I
>>>>>>>>>> would very much like to hear about it.
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>>
>>>>>>>>>>
>>>
>>> <ReadSplitWrite.java><ReadSplitWriteTest.java>
>>
>