Dave,

Not a problem.

The FlowFile object itself is immutable. If you want to modify the FlowFile, 
you do so by asking 
the session to give you a new version of the FlowFile with some update. For 
instance, by adding 
an attribute or changing the content of the FlowFile.

So any call to session.putAttribute or session.write returns a new FlowFile. If 
you update
the line that calls putAttribute so that it stores the returned FlowFile into 
your 'parsed' variable,
you should be good to go.

So you would do:

FlowFile parsed = session.create(original);
parsed = session.putAttribute(parsed, CoreAttributes.FILENAME.key(), 
context.getProperty(PARSED_FILENAME).getValue());

Otherwise, you end up trying to modify the same version twice (once when you 
call session.putAttribute and 
again when you call session.write). This is what the message is complaining 
about.

Just looking through the code, a few other comments that I would offer:

* the "static boolean error = false;" is likely to cause problems. All 
instances of your processor would get the same 'error' variable. 
I would recommend you use an org.apache.nifi.util.BooleanHolder object (defined 
in the nifi-utils module) and define 
it within your onTrigger method, rather than using a member variable.

* Experience has shown that with any log message, you should log the FlowFIle 
that you are referring to. You can
also parameterize your log messages. For example:

logger.error("Failed to parse {}; routing to failure", new Object[] {original});

rather than

logger.error("parsing to failure");


I hope this helps! Let us know if you're still having problems!

Thanks
-Mark

________________________________
> Date: Sat, 15 Aug 2015 19:41:00 +0000 
> From: [email protected] 
> To: [email protected] 
> Subject: Re: Writing to a flowfile 
> 
> Mark 
> 
> Thanks for your help. I have used the snippet of code you sent and it 
> works although I am fairly sure I haven't implemented it correctly, I 
> have had to put all of my code in the OnTrigger method, instead of in 
> the the callback. 
> I also need to change the filename attribute of the parsed flowfile, I 
> have inserted the following line: 
> 
> session.putAttribute(parsed, CoreAttributes.FILENAME.key(), 
> context.getProperty(PARSED_FILENAME).getValue()); 
> 
> But it gives me the following error: 
> 2015-08-15 21:28:55,628 ERROR [Timer-Driven Process Thread-5] 
> o.a.nifi.processors.standard.ParseMyData 
> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] 
> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] failed to process 
> due to org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
>  
> is not the most recent version of this FlowFile within this session 
> (StandardProcessSession[id=21562]); rolling back session: 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
>  
> is not the most recent version of this FlowFile within this session 
> (StandardProcessSession[id=21562]) 
> 
> 
> I have attached my processor class, I would be grateful if you could 
> give it a quick look and tell me what I have done wrong. 
> 
> Many thanks 
> Dave 
> 
> 
> 
> On Saturday, 15 August 2015, 13:16, Mark Payne <[email protected]> wrote: 
> 
> 
> David, 
> 
> In this case, since you want to keep the original intact, you will need 
> to create a 'child' flowfile to write to. 
> You do this with ProcessSession.create(FlowFile) 
> 
> So you will have code that looks something like this: 
> 
> final FlowFile original = session.get(); 
> if (original == null) { 
> return; 
> } 
> 
> // create a new 'child' FlowFile. The framework will automatically handle 
> // the provenance information so that 'parsed' is forked from 'original'. 
> FlowFile parsed = session.create(original); 
> 
> // Get an OutputStream for the 'parsed' FlowFile 
> parsed = session.write(parsed, new OutputStreamCallback() { 
> public void process(OutputStream parsedOut) { 
> 
> // Get an InputStream for the original 
> session.read(original, new InputStreamCallback() { 
> public void process(InputStream originalIn) { 
> // read from original FlowFile via originalIn 
> // write to new FlowFile via parsedOut 
> } 
> }); 
> 
> } 
> }); 
> 
> Does this give you what you need? If anything is still unclear, let us know! 
> 
> Thanks 
> -Mark 
> 
> ---------------------------------------- 
>> Date: Sat, 15 Aug 2015 10:04:54 +0100 
>> From: [email protected]<mailto:[email protected]> 
>> Subject: Writing to a flowfile 
>> To: [email protected]<mailto:[email protected]> 
>> 
>> 
>> Hi 
>> 
>> I'm writing a processor which parses a file, I want the parsed file 
> to go to relationship parsed, and the original file to go to 
> relationship original, if the parse was ok. 
>> If the parse fails I want the original file to go to relationship failure. 
>> 
>> I have an inner class which contains a callback which does the 
> parsing. The callback is called from the onTrigger method. 
>> My problem is that I want to read from my original flowFile and write 
> to a new flowFile, but it always seems to write to the original 
> flowfile. 
>> How do I direct my bufferedwriter to my new flowfile? 
>> 
>> Many thanks 
>> Dave 
>> 
>> Sent from Yahoo! Mail on Android 
>> 
> 
> 
> 
                                          

Reply via email to