Sijie Guo created BOOKKEEPER-442:
------------------------------------

             Summary: Failed to deliver messages due to inconsistency between 
SubscriptionState and LedgerRanges.
                 Key: BOOKKEEPER-442
                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-442
             Project: Bookkeeper
          Issue Type: Bug
          Components: hedwig-server
    Affects Versions: 4.1.0, 4.2.0
            Reporter: Sijie Guo
            Assignee: Jiannan Wang
            Priority: Critical
             Fix For: 4.2.0


The problems encountered when failed to updateSubscriptionState but deleted 
consumed ledgers. 

The issue is described as below:

1) A subscriber setLastConsumeSeqId to move consume ptr. If the consume ptr is 
moved over consume interval, an update subscription state operation is issued 
to update to ZooKeeper.

{code}

AbstractSubscriptionManager:

            
            if (subState.setLastConsumeSeqId(consumeSeqId, 
cfg.getConsumeInterval())) {                updateSubscriptionState(topic, 
subscriberId, subState, cb, ctx);
            }
{code}

2) when move consume ptr, it also changed in-memory subscription state before 
the subscription state is persisted to ZooKeeper.

{code}
    public boolean setLastConsumeSeqId(MessageSeqId lastConsumeSeqId, int 
consumeInterval) {
        long interval = lastConsumeSeqId.getLocalComponent() - 
subscriptionState.getMsgId().          getLocalComponent();
        if (interval <= 0) {
            return false;
        }

        // set consume seq id when it is larger
        this.lastConsumeSeqId = lastConsumeSeqId;
        if (interval < consumeInterval) {
            return false;
        }

        // subscription state will be updated, marked it as clean
        subscriptionState = SubscriptionState.newBuilder(subscriptionState).    
                      setMsgId(lastConsumeSeqId).build();
        return true;
    }
{code}

3) MessageConsumedTask runs periodically to delete consumed ledgers. it would 
use in-memory subscription state to perform such deletion. so if ledger is 
deleted first and failed to update subscription state. it would cause 
inconsistent state, when hub restarts and subscriber reconnects, it would use 
old seq id to start delivering but the ledger has messages with old seq id has 
been deleted.

{code}
for (InMemorySubscriptionState curSubscription : topicSubscriptions.values()) {
                    if 
(curSubscription.getSubscriptionState().getMsgId().getLocalComponent() <       
minConsumedMessage)
                        minConsumedMessage = 
curSubscription.getSubscriptionState().getMsgId().       getLocalComponent();
                    hasBound = hasBound && 
curSubscription.getSubscriptionPreferences().              hasMessageBound();
                }
{code} 

The fix would be let message consume task only use persistence state to 
performance deletions only. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to