[
https://issues.apache.org/jira/browse/BOOKKEEPER-439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13481980#comment-13481980
]
Sijie Guo commented on BOOKKEEPER-439:
--------------------------------------
[[email protected]] Ur right about 2) adding startSeqId in ledger range. You
could add unit tests (in order to pass unit tests, you need include changes in
bookkeeper-server to throw BKNoSuchLedgerExistsException when delete
non-existed ledger and catch it when deleteLedger otherwise that ledger range
entry would never removed from ledger ranges).
> No more messages delivered after deleted consumed ledgers.
> ----------------------------------------------------------
>
> Key: BOOKKEEPER-439
> URL: https://issues.apache.org/jira/browse/BOOKKEEPER-439
> Project: Bookkeeper
> Issue Type: Bug
> Components: hedwig-server
> Affects Versions: 4.1.0, 4.2.0
> Reporter: Sijie Guo
> Assignee: Sijie Guo
> Priority: Critical
> Fix For: 4.2.0
>
> Attachments: BOOKKEEPER-439.diff, BOOKKEEPER-439.diff,
> TopicMetadataAddStartSeqId.patch
>
>
> We encountered exception as below:
> {quote}
> 2012-10-18 09:27:27,248 - DEBUG
> [CacheThread:BookkeeperPersistenceManager$RangeScanOp@247] - Issuing a bk
> read for ledger: L2 from entry-id: 100 to entry-id: 103
> 2012-10-18 09:27:27,248 - ERROR
> [CacheThread:BookkeeperPersistenceManager$RangeScanOp$2@261] - Error while
> reading from ledger: L2 for topic: TOPIC
> org.apache.bookkeeper.client.BKException$BKReadException
> at
> org.apache.bookkeeper.client.BKException.create(BKException.java:48)
> at
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp$2.safeReadComplete(BookkeeperPersistenceManager.java:260)
> at
> org.apache.hedwig.zookeeper.SafeAsynBKCallback$ReadCallback.readComplete(SafeAsynBKCallback.java:61)
> at
> org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:380)
> at
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.read(BookkeeperPersistenceManager.java:252)
> at
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.startReadingFrom(BookkeeperPersistenceManager.java:327)
> at
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.runInternal(BookkeeperPersistenceManager.java:217)
> at
> org.apache.hedwig.server.common.TopicOpQueuer$SynchronousOp.run(TopicOpQueuer.java:77)
> at
> org.apache.hedwig.server.common.TopicOpQueuer.pushAndMaybeRun(TopicOpQueuer.java:105)
> at
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.scanMessages(BookkeeperPersistenceManager.java:336)
> at
> org.apache.hedwig.server.persistence.ReadAheadCache$ScanRequestWrapper.performRequest(ReadAheadCache.java:704)
> at
> org.apache.hedwig.server.persistence.ReadAheadCache.run(ReadAheadCache.java:291)
> at java.lang.Thread.run(Thread.java:662)
> {quote}
> topic TOPIC has 2 ledgers L1, L2, each ledger has 100 entries.
> 1) all the 100 entries in L1 has been delivered and consumed.
> 2) 100 entries have been wrote to L2 but not delivered.
> 3) L1 is deleted since all its entries have been consumed.
> 4) hub server shuts down
> 5) TOPIC recovered L2 and started delivering from 101.
> TOPIC was expected to issue a read [0-3] from L2, but a read [100-103] was
> issued from the exception log, so no entries would be expected to read from
> L2 at [100-103].
> The problem of this issue is that we used 0 and 1 for the start of message id
> and ledger id even we had some consumed ledgers deleted.
> {code}
> void processTopicLedgerRanges(final LedgerRanges ranges, final
> Version version) {
> Iterator<LedgerRange> lrIterator =
> ranges.getRangesList().iterator();
> TopicInfo topicInfo = new TopicInfo();
> long startOfLedger = 1;
> while (lrIterator.hasNext()) {
> LedgerRange range = lrIterator.next();
> if (range.hasEndSeqIdIncluded()) {
> // this means it was a valid and completely closed ledger
> long endOfLedger =
> range.getEndSeqIdIncluded().getLocalComponent();
> topicInfo.ledgerRanges.put(endOfLedger, new
> InMemoryLedgerRange(range, startOfLedger));
> startOfLedger = endOfLedger + 1;
> continue;
> }
> // If it doesn't have a valid end, it must be the last ledger
> if (lrIterator.hasNext()) {
> String msg = "Ledger-id: " + range.getLedgerId() + " for
> topic: " + topic. toStringUtf8()
> + " is not the last one but still does not have an end seq-id";
> logger.error(msg);
> cb.operationFailed(ctx, new
> PubSubException.UnexpectedConditionException(msg));
> return; }
> // The last ledger does not have a valid seq-id, lets try to
> // find it out
> recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(),
> version, topicInfo);
> return;
> }
> {code}
> {code}
> long prevLedgerEnd =
> topicInfo.ledgerRanges.isEmpty() ? 0 : topicInfo. ledgerRanges
> .lastKey();
> LedgerRange lr =
> LedgerRange.newBuilder().setLedgerId(ledgerId)
>
> .setEndSeqIdIncluded(lastMessage.getMsgId()).build();
>
> topicInfo.ledgerRanges.put(lr.getEndSeqIdIncluded().getLocalComponent(),
> new InMemoryLedgerRange(lr, prevLedgerEnd
> + 1, lh));
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira