Github user jianbzhou commented on the pull request:
https://github.com/apache/storm/pull/1131#issuecomment-217905885
thanks Hmcl.
Just found below log constantly show up, seems it constantly try to commit
one offset which is actually committed to kafka already â it might be caused
by group rebalance â so a smaller offset (smaller than the committed offset)
is acked back lately.
For example(it is our assumption, kindly correct me if wrong): one consumer
commit offset 1000, polled 1001~1050 messages and emitted, also message was
acked for 1001 ~ 1009, then a rebalance happened, another consumer poll message
from 1000 to 1025, and commit the offset to 1010, then the message 1010(was
emitted before the rebalance) was acked back. This will cause 1010 will never
be committed as per the logic in findNextCommitOffset method â because this
offset was already commited to kafka successfully.
Log is:
2016-05-09 03:02:14 io.ebay.rheos.KafkaSpout [INFO] Unexpected offset found
[37137]. OffsetEntry{topic-partition=oradb.core4-lcr.caty.ebay-bids-3,
fetchOffset=37138, committedOffset=37137,
ackedMsgs={topic-partition=oradb.core4-lcr.caty.ebay-bids-3, offset=37137,
numFails=0}|{topic-partition=oradb.core4-lcr.caty.ebay-bids-3, offset=37137,
numFails=0}}
We applied below fix - For OffsetEntry.add(KafkaSpoutMessageId msgId)
method, we changed the code as per below â only add acked message when its
offset is bigger than the committed offset.
public void add(KafkaSpoutMessageId msgId) { // O(Log N)
**_if(msgId.offset() > committedOffset)//this line is newly added_**
ackedMsgs.add(msgId);
}
Could you please help take a look at the above and let me know your
thoughts? Thanks.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---