Mark
Thanks very much for all of your help, that works really well, I have also
taken on board your other comments and implemented them on my home version. I
will use it all at work tomorrow.
As you may have seen on a post I made in July, I have taken the put & get JMS
processors and made a modified version for using with an AMQP broker. They
appear to work well and my boss (John Thorp) would like me to contribute them
back to org.apache.nifi.
Before I can do that I need to write some Junit tests, but I have no idea how
to mock an AMQP broker/queue.To contribute the code for consideration do I need
to create my own branch in the development code, insert my code and then push
it back up. Currently my code is on github (link in July posts) .
Thanks again for your helpDave
On Saturday, 15 August 2015, 22:39, Mark Payne <[email protected]>
wrote:
Dave,
Not a problem.
The FlowFile object itself is immutable. If you want to modify the FlowFile,
you do so by asking
the session to give you a new version of the FlowFile with some update. For
instance, by adding
an attribute or changing the content of the FlowFile.
So any call to session.putAttribute or session.write returns a new FlowFile. If
you update
the line that calls putAttribute so that it stores the returned FlowFile into
your 'parsed' variable,
you should be good to go.
So you would do:
FlowFile parsed = session.create(original);
parsed = session.putAttribute(parsed, CoreAttributes.FILENAME.key(),
context.getProperty(PARSED_FILENAME).getValue());
Otherwise, you end up trying to modify the same version twice (once when you
call session.putAttribute and
again when you call session.write). This is what the message is complaining
about.
Just looking through the code, a few other comments that I would offer:
* the "static boolean error = false;" is likely to cause problems. All
instances of your processor would get the same 'error' variable.
I would recommend you use an org.apache.nifi.util.BooleanHolder object (defined
in the nifi-utils module) and define
it within your onTrigger method, rather than using a member variable.
* Experience has shown that with any log message, you should log the FlowFIle
that you are referring to. You can
also parameterize your log messages. For example:
logger.error("Failed to parse {}; routing to failure", new Object[] {original});
rather than
logger.error("parsing to failure");
I hope this helps! Let us know if you're still having problems!
Thanks
-Mark
________________________________
> Date: Sat, 15 Aug 2015 19:41:00 +0000
> From: [email protected]
> To: [email protected]
> Subject: Re: Writing to a flowfile
>
> Mark
>
> Thanks for your help. I have used the snippet of code you sent and it
> works although I am fairly sure I haven't implemented it correctly, I
> have had to put all of my code in the OnTrigger method, instead of in
> the the callback.
> I also need to change the filename attribute of the parsed flowfile, I
> have inserted the following line:
>
> session.putAttribute(parsed, CoreAttributes.FILENAME.key(),
> context.getProperty(PARSED_FILENAME).getValue());
>
> But it gives me the following error:
> 2015-08-15 21:28:55,628 ERROR [Timer-Driven Process Thread-5]
> o.a.nifi.processors.standard.ParseMyData
> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b]
> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] failed to process
> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
>
> is not the most recent version of this FlowFile within this session
> (StandardProcessSession[id=21562]); rolling back session:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
>
> is not the most recent version of this FlowFile within this session
> (StandardProcessSession[id=21562])
>
>
> I have attached my processor class, I would be grateful if you could
> give it a quick look and tell me what I have done wrong.
>
> Many thanks
> Dave
>
>
>
> On Saturday, 15 August 2015, 13:16, Mark Payne <[email protected]> wrote:
>
>
> David,
>
> In this case, since you want to keep the original intact, you will need
> to create a 'child' flowfile to write to.
> You do this with ProcessSession.create(FlowFile)
>
> So you will have code that looks something like this:
>
> final FlowFile original = session.get();
> if (original == null) {
> return;
> }
>
> // create a new 'child' FlowFile. The framework will automatically handle
> // the provenance information so that 'parsed' is forked from 'original'.
> FlowFile parsed = session.create(original);
>
> // Get an OutputStream for the 'parsed' FlowFile
> parsed = session.write(parsed, new OutputStreamCallback() {
> public void process(OutputStream parsedOut) {
>
> // Get an InputStream for the original
> session.read(original, new InputStreamCallback() {
> public void process(InputStream originalIn) {
> // read from original FlowFile via originalIn
> // write to new FlowFile via parsedOut
> }
> });
>
> }
> });
>
> Does this give you what you need? If anything is still unclear, let us know!
>
> Thanks
> -Mark
>
> ----------------------------------------
>> Date: Sat, 15 Aug 2015 10:04:54 +0100
>> From: [email protected]<mailto:[email protected]>
>> Subject: Writing to a flowfile
>> To: [email protected]<mailto:[email protected]>
>>
>>
>> Hi
>>
>> I'm writing a processor which parses a file, I want the parsed file
> to go to relationship parsed, and the original file to go to
> relationship original, if the parse was ok.
>> If the parse fails I want the original file to go to relationship failure.
>>
>> I have an inner class which contains a callback which does the
> parsing. The callback is called from the onTrigger method.
>> My problem is that I want to read from my original flowFile and write
> to a new flowFile, but it always seems to write to the original
> flowfile.
>> How do I direct my bufferedwriter to my new flowfile?
>>
>> Many thanks
>> Dave
>>
>> Sent from Yahoo! Mail on Android
>>
>
>
>