Dave,

Not a problem. I do remember a note from you about making those processors able 
to communicate with AMQP and wanting to contribute that back, which is great! I 
don't remember seeing a link to the code, though. I will have to go back and 
check again.

If I can find that link, I can try to give some more specific advice, but in 
general when I create a processor that interacts with an external endpoint, I 
will create a method that returns the client that I am using. It's often 
something super simple like:

protected Client getClient() {
    return client;
}

This way, in my unit test I can simply create a subclass that is able to 
returned a mocked out client:

final TestRunner runner = TestRunners.newTestRunner(new MyProcessor() {
    protected Client getClient() {
        return new MockClient();
    }
});

This way, I can easily mock the client out to do whatever I need, including 
things like throwing IOException to ensure that it is handled properly.

Again, I'll try to find that link and offer more specific pointers if I can - 
if you have the link handy that would be good, in case I'm not able to find the 
link
that you sent last.

Thanks
-Mark

----------------------------------------
> Date: Sun, 16 Aug 2015 19:01:28 +0000
> From: [email protected]
> To: [email protected]
> Subject: Re: Writing to a flowfile
>
> Mark
> Thanks very much for all of your help, that works really well, I have also 
> taken on board your other comments and implemented them on my home version. I 
> will use it all at work tomorrow.
> As you may have seen on a post I made in July, I have taken the put & get JMS 
> processors and made a modified version for using with an AMQP broker. They 
> appear to work well and my boss (John Thorp) would like me to contribute them 
> back to org.apache.nifi.
> Before I can do that I need to write some Junit tests, but I have no idea how 
> to mock an AMQP broker/queue.To contribute the code for consideration do I 
> need to create my own branch in the development code, insert my code and then 
> push it back up. Currently my code is on github (link in July posts) .
> Thanks again for your helpDave
>
>
> On Saturday, 15 August 2015, 22:39, Mark Payne <[email protected]> wrote:
>
>
> Dave,
>
> Not a problem.
>
> The FlowFile object itself is immutable. If you want to modify the FlowFile, 
> you do so by asking
> the session to give you a new version of the FlowFile with some update. For 
> instance, by adding
> an attribute or changing the content of the FlowFile.
>
> So any call to session.putAttribute or session.write returns a new FlowFile. 
> If you update
> the line that calls putAttribute so that it stores the returned FlowFile into 
> your 'parsed' variable,
> you should be good to go.
>
> So you would do:
>
> FlowFile parsed = session.create(original);
> parsed = session.putAttribute(parsed, CoreAttributes.FILENAME.key(), 
> context.getProperty(PARSED_FILENAME).getValue());
>
> Otherwise, you end up trying to modify the same version twice (once when you 
> call session.putAttribute and
> again when you call session.write). This is what the message is complaining 
> about.
>
> Just looking through the code, a few other comments that I would offer:
>
> * the "static boolean error = false;" is likely to cause problems. All 
> instances of your processor would get the same 'error' variable.
> I would recommend you use an org.apache.nifi.util.BooleanHolder object 
> (defined in the nifi-utils module) and define
> it within your onTrigger method, rather than using a member variable.
>
> * Experience has shown that with any log message, you should log the FlowFIle 
> that you are referring to. You can
> also parameterize your log messages. For example:
>
> logger.error("Failed to parse {}; routing to failure", new Object[] 
> {original});
>
> rather than
>
> logger.error("parsing to failure");
>
>
> I hope this helps! Let us know if you're still having problems!
>
> Thanks
> -Mark
>
> ________________________________
>> Date: Sat, 15 Aug 2015 19:41:00 +0000
>> From: [email protected]
>> To: [email protected]
>> Subject: Re: Writing to a flowfile
>>
>> Mark
>>
>> Thanks for your help. I have used the snippet of code you sent and it
>> works although I am fairly sure I haven't implemented it correctly, I
>> have had to put all of my code in the OnTrigger method, instead of in
>> the the callback.
>> I also need to change the filename attribute of the parsed flowfile, I
>> have inserted the following line:
>>
>> session.putAttribute(parsed, CoreAttributes.FILENAME.key(),
>> context.getProperty(PARSED_FILENAME).getValue());
>>
>> But it gives me the following error:
>> 2015-08-15 21:28:55,628 ERROR [Timer-Driven Process Thread-5]
>> o.a.nifi.processors.standard.ParseMyData
>> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b]
>> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] failed to process
>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
>> is not the most recent version of this FlowFile within this session
>> (StandardProcessSession[id=21562]); rolling back session:
>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
>> is not the most recent version of this FlowFile within this session
>> (StandardProcessSession[id=21562])
>>
>>
>> I have attached my processor class, I would be grateful if you could
>> give it a quick look and tell me what I have done wrong.
>>
>> Many thanks
>> Dave
>>
>>
>>
>> On Saturday, 15 August 2015, 13:16, Mark Payne <[email protected]> wrote:
>>
>>
>> David,
>>
>> In this case, since you want to keep the original intact, you will need
>> to create a 'child' flowfile to write to.
>> You do this with ProcessSession.create(FlowFile)
>>
>> So you will have code that looks something like this:
>>
>> final FlowFile original = session.get();
>> if (original == null) {
>> return;
>> }
>>
>> // create a new 'child' FlowFile. The framework will automatically handle
>> // the provenance information so that 'parsed' is forked from 'original'.
>> FlowFile parsed = session.create(original);
>>
>> // Get an OutputStream for the 'parsed' FlowFile
>> parsed = session.write(parsed, new OutputStreamCallback() {
>> public void process(OutputStream parsedOut) {
>>
>> // Get an InputStream for the original
>> session.read(original, new InputStreamCallback() {
>> public void process(InputStream originalIn) {
>> // read from original FlowFile via originalIn
>> // write to new FlowFile via parsedOut
>> }
>> });
>>
>> }
>> });
>>
>> Does this give you what you need? If anything is still unclear, let us know!
>>
>> Thanks
>> -Mark
>>
>> ----------------------------------------
>>> Date: Sat, 15 Aug 2015 10:04:54 +0100
>>> From: [email protected]<mailto:[email protected]>
>>> Subject: Writing to a flowfile
>>> To: [email protected]<mailto:[email protected]>
>>>
>>>
>>> Hi
>>>
>>> I'm writing a processor which parses a file, I want the parsed file
>> to go to relationship parsed, and the original file to go to
>> relationship original, if the parse was ok.
>>> If the parse fails I want the original file to go to relationship failure.
>>>
>>> I have an inner class which contains a callback which does the
>> parsing. The callback is called from the onTrigger method.
>>> My problem is that I want to read from my original flowFile and write
>> to a new flowFile, but it always seems to write to the original
>> flowfile.
>>> How do I direct my bufferedwriter to my new flowfile?
>>>
>>> Many thanks
>>> Dave
>>>
>>> Sent from Yahoo! Mail on Android
>>>
>>
>>
>>
>
>
>
                                          

Reply via email to