Joe, Thanks for your response. So, I have to somehow handle all the flows that I get from the session. Is that right? Would it be sufficient for me to call session.transfer(flowFile);? It's not really clear what that function does. Based on your response, sounds like it should work - but, wanted to confirm.
Thanks, Mike. On Fri, Mar 31, 2017 at 7:35 PM, Joe Witt <[email protected]> wrote: > MIke, > > This part [1] needs to do something with those flowfiles. You've > pulled them from the queue and then during this logic/flow do not do > anything with them. You have a 'toProcess' collection and perhaps you > need a "toRetry" collection for these that exceed the total data size. > I believe you can transfer the flowfile back to itself with > transfer(). > > The general way to think of the process session is that it will track > that for everything you pull or create that you've decided what to do > with it and if you've left some object in an unhandled state it will > reject your session and cause a rollback to ensure things stay > consistent and you have a chance to correct the issue. > > [1] https://github.com/synack/nifi-gcp-pubsub-publisher/ > blob/master/src/main/java/com/synack/nifi/gcp/pubsub/ > publisher/GcpPubsubPublisher.java#L179-L181 > > Thanks > Joe > > On Fri, Mar 31, 2017 at 8:26 PM, Mikhail Sosonkin > <[email protected]> wrote: > > Hi All, > > > > Wondering if someone could help me with a custom processor error. I'm > > getting an error when doing a session commit in onTrigger [0]. > > > > 017-03-31 23:50:44,684 ERROR [Timer-Driven Process Thread-8] > > c.s.n.g.p.publisher.GcpPubsubPublisher > > org.apache.nifi.processor.exception.FlowFileHandlingException: > > StandardFlowFileRecord[uuid=cd7e3cc7-9a3b-4734-b080-b7ac7a2e673a,claim= > StandardContentClaim > > [resourceClaim=StandardResourceClaim[id=1491003549167-3033966, > > container=default, section=878], offset=16831, > > length=231],offset=0,name=18760161951543666,size=231] > > transfer relationship not specified > > at org.apache.nifi.controller.repository.StandardProcessSession. > checkpoint( > > StandardProcessSession.java:234) ~[nifi-framework-core-1.1.1.jar:1.1.1] > > at org.apache.nifi.controller.repository.StandardProcessSession.commit( > > StandardProcessSession.java:304) ~[nifi-framework-core-1.1.1.jar:1.1.1] > > at com.synack.nifi.gcp.pubsub.publisher.GcpPubsubPublisher. > > onTrigger(GcpPubsubPublisher.java:207) ~[na:na] > > at org.apache.nifi.processor.AbstractProcessor.onTrigger( > AbstractProcessor.java:27) > > ~[nifi-api-1.1.1.jar:1.1.1] > > > > The flow file logic is super simple. Get a whole bunch, either ignore > them, > > transfer them to a relationship or delete them. Most get deleted from the > > session. Not quite sure what I'm missing here. Do I need to free the > > FlowFiles somehow? Do I need to 'unget' the files I got from the session > - > > those that didn't get transferred or removed? > > > > Thanks for your help! > > > > Mike. > > > > [0] https://github.com/synack/nifi-gcp-pubsub-publisher/ > > blob/master/src/main/java/com/synack/nifi/gcp/pubsub/ > > publisher/GcpPubsubPublisher.java#L150 > > > > -- > > This email may contain material that is confidential for the sole use of > > the intended recipient(s). Any review, reliance or distribution or > > disclosure by others without express permission is strictly prohibited. > If > > you are not the intended recipient, please contact the sender and delete > > all copies of this message. > -- This email may contain material that is confidential for the sole use of the intended recipient(s). Any review, reliance or distribution or disclosure by others without express permission is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies of this message.
