[
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276541#comment-15276541
]
ASF GitHub Bot commented on STORM-822:
--------------------------------------
Github user jianbzhou commented on the pull request:
https://github.com/apache/storm/pull/1131#issuecomment-217905885
thanks Hmcl.
Just found below log constantly show up, seems it constantly try to commit
one offset which is actually committed to kafka already – it might be caused by
group rebalance – so a smaller offset (smaller than the committed offset) is
acked back lately.
For example(it is our assumption, kindly correct me if wrong): one consumer
commit offset 1000, polled 1001~1050 messages and emitted, also message was
acked for 1001 ~ 1009, then a rebalance happened, another consumer poll message
from 1000 to 1025, and commit the offset to 1010, then the message 1010(was
emitted before the rebalance) was acked back. This will cause 1010 will never
be committed as per the logic in findNextCommitOffset method – because this
offset was already commited to kafka successfully.
Log is:
2016-05-09 03:02:14 io.ebay.rheos.KafkaSpout [INFO] Unexpected offset found
[37137]. OffsetEntry{topic-partition=oradb.core4-lcr.caty.ebay-bids-3,
fetchOffset=37138, committedOffset=37137,
ackedMsgs={topic-partition=oradb.core4-lcr.caty.ebay-bids-3, offset=37137,
numFails=0}|{topic-partition=oradb.core4-lcr.caty.ebay-bids-3, offset=37137,
numFails=0}}
We applied below fix - For OffsetEntry.add(KafkaSpoutMessageId msgId)
method, we changed the code as per below – only add acked message when its
offset is bigger than the committed offset.
public void add(KafkaSpoutMessageId msgId) { // O(Log N)
**_if(msgId.offset() > committedOffset)//this line is newly added_**
ackedMsgs.add(msgId);
}
Could you please help take a look at the above and let me know your
thoughts? Thanks.
> Kafka Spout New Consumer API
> ----------------------------
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
> Issue Type: Story
> Components: storm-kafka
> Reporter: Thomas Becker
> Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)