Sijie Guo created BOOKKEEPER-442:
------------------------------------
Summary: Failed to deliver messages due to inconsistency between
SubscriptionState and LedgerRanges.
Key: BOOKKEEPER-442
URL: https://issues.apache.org/jira/browse/BOOKKEEPER-442
Project: Bookkeeper
Issue Type: Bug
Components: hedwig-server
Affects Versions: 4.1.0, 4.2.0
Reporter: Sijie Guo
Assignee: Jiannan Wang
Priority: Critical
Fix For: 4.2.0
The problems encountered when failed to updateSubscriptionState but deleted
consumed ledgers.
The issue is described as below:
1) A subscriber setLastConsumeSeqId to move consume ptr. If the consume ptr is
moved over consume interval, an update subscription state operation is issued
to update to ZooKeeper.
{code}
AbstractSubscriptionManager:
if (subState.setLastConsumeSeqId(consumeSeqId,
cfg.getConsumeInterval())) { updateSubscriptionState(topic,
subscriberId, subState, cb, ctx);
}
{code}
2) when move consume ptr, it also changed in-memory subscription state before
the subscription state is persisted to ZooKeeper.
{code}
public boolean setLastConsumeSeqId(MessageSeqId lastConsumeSeqId, int
consumeInterval) {
long interval = lastConsumeSeqId.getLocalComponent() -
subscriptionState.getMsgId(). getLocalComponent();
if (interval <= 0) {
return false;
}
// set consume seq id when it is larger
this.lastConsumeSeqId = lastConsumeSeqId;
if (interval < consumeInterval) {
return false;
}
// subscription state will be updated, marked it as clean
subscriptionState = SubscriptionState.newBuilder(subscriptionState).
setMsgId(lastConsumeSeqId).build();
return true;
}
{code}
3) MessageConsumedTask runs periodically to delete consumed ledgers. it would
use in-memory subscription state to perform such deletion. so if ledger is
deleted first and failed to update subscription state. it would cause
inconsistent state, when hub restarts and subscriber reconnects, it would use
old seq id to start delivering but the ledger has messages with old seq id has
been deleted.
{code}
for (InMemorySubscriptionState curSubscription : topicSubscriptions.values()) {
if
(curSubscription.getSubscriptionState().getMsgId().getLocalComponent() <
minConsumedMessage)
minConsumedMessage =
curSubscription.getSubscriptionState().getMsgId(). getLocalComponent();
hasBound = hasBound &&
curSubscription.getSubscriptionPreferences(). hasMessageBound();
}
{code}
The fix would be let message consume task only use persistence state to
performance deletions only.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira