WayneZhou created STORM-2292:
--------------------------------

             Summary: Kafka spout enhancement, for our of range edge cases
                 Key: STORM-2292
                 URL: https://issues.apache.org/jira/browse/STORM-2292
             Project: Apache Storm
          Issue Type: Improvement
          Components: storm-kafka
    Affects Versions: 0.10.0
            Reporter: WayneZhou
             Fix For: 0.10.0
         Attachments: KafkaSpout.java.txt

@hmcl and all, we have communicated via email for a while and going forward 
let's talk in this thread so everyone is in same page.
Base on the spout from the community(written by you), we have several fixes and 
it worked quite stable in our production for about 6 months.

We want to share the latest spout to you and could you please kindly help 
review and merge to the community version if any fix is reasonable? we want to 
avoid diverging too much from the community version.

Below are our major fixes:

For failed message, in next tuple method, originally the spout seek back to the 
non-continuous offset, so the failed message will be polled again for retry, 
say we seek back to message 10 for retry, now if kafka log file was purged, 
earliest offset is 1000, it means we will seek to 10 but reset to 1000 as per 
the reset policy, and we cannot poll the message 10, so spout not work.
Our fix is: we manually catch the out of range exception, commit the offset to 
earliest offset first, then seek to the earliest offset

Currently the way to find next committed offset is very complex, under some 
edge cases – a), if no message acked back because bolt has some issue or cannot 
catch up with the spout emit; b) seek back is happened frequently and it is 
much faster than the message be acked back
We give each message a status – None, emit, acked, failed(if failed number is 
bigger than the maximum retry, set to acked)

One of our use cases need ordering in partition level, so after seek back for 
retry, we re-emit all the follow messages again no matter they have emitted or 
not, if possible, maybe you can give an option here to configure it – either 
re-emit all the message from the failed one, or just emit the failed one, same 
as current version.

We record the message count for acked, failed, emitted, just for statistics.

Could you please kindly help review and let us know if you can merge it into 
the community version? Any comments/concern pls feel free to let us know. Btw, 
our code is attached in this Jira.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to