[
https://issues.apache.org/jira/browse/BOOKKEEPER-442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13490182#comment-13490182
]
Jiannan Wang commented on BOOKKEEPER-442:
-----------------------------------------
Thanks Ivan for review, I'll update the patch.
Sorry for delay reply, I'm make a moving and cannot access internet these days.
> Failed to deliver messages due to inconsistency between SubscriptionState and
> LedgerRanges.
> -------------------------------------------------------------------------------------------
>
> Key: BOOKKEEPER-442
> URL: https://issues.apache.org/jira/browse/BOOKKEEPER-442
> Project: Bookkeeper
> Issue Type: Bug
> Components: hedwig-server
> Affects Versions: 4.1.0, 4.2.0
> Reporter: Sijie Guo
> Assignee: Jiannan Wang
> Priority: Critical
> Fix For: 4.2.0
>
> Attachments: BOOKKEEPER-442.diff
>
>
> The problems encountered when failed to updateSubscriptionState but deleted
> consumed ledgers.
> The issue is described as below:
> 1) A subscriber setLastConsumeSeqId to move consume ptr. If the consume ptr
> is moved over consume interval, an update subscription state operation is
> issued to update to ZooKeeper.
> {code}
> AbstractSubscriptionManager:
>
> if (subState.setLastConsumeSeqId(consumeSeqId,
> cfg.getConsumeInterval())) { updateSubscriptionState(topic,
> subscriberId, subState, cb, ctx);
> }
> {code}
> 2) when move consume ptr, it also changed in-memory subscription state before
> the subscription state is persisted to ZooKeeper.
> {code}
> public boolean setLastConsumeSeqId(MessageSeqId lastConsumeSeqId, int
> consumeInterval) {
> long interval = lastConsumeSeqId.getLocalComponent() -
> subscriptionState.getMsgId(). getLocalComponent();
> if (interval <= 0) {
> return false;
> }
> // set consume seq id when it is larger
> this.lastConsumeSeqId = lastConsumeSeqId;
> if (interval < consumeInterval) {
> return false;
> }
> // subscription state will be updated, marked it as clean
> subscriptionState = SubscriptionState.newBuilder(subscriptionState).
> setMsgId(lastConsumeSeqId).build();
> return true;
> }
> {code}
> 3) MessageConsumedTask runs periodically to delete consumed ledgers. it would
> use in-memory subscription state to perform such deletion. so if ledger is
> deleted first and failed to update subscription state. it would cause
> inconsistent state, when hub restarts and subscriber reconnects, it would use
> old seq id to start delivering but the ledger has messages with old seq id
> has been deleted.
> {code}
> for (InMemorySubscriptionState curSubscription : topicSubscriptions.values())
> {
> if
> (curSubscription.getSubscriptionState().getMsgId().getLocalComponent() <
> minConsumedMessage)
> minConsumedMessage =
> curSubscription.getSubscriptionState().getMsgId(). getLocalComponent();
> hasBound = hasBound &&
> curSubscription.getSubscriptionPreferences(). hasMessageBound();
> }
> {code}
> The fix would be let message consume task only use persistence state to
> performance deletions only.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira