Sijie Guo created BOOKKEEPER-439:
------------------------------------

             Summary: No more messages delivered after deleted consumed ledgers.
                 Key: BOOKKEEPER-439
                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-439
             Project: Bookkeeper
          Issue Type: Bug
          Components: hedwig-server
    Affects Versions: 4.1.0, 4.2.0
            Reporter: Sijie Guo
            Assignee: Sijie Guo
            Priority: Critical
             Fix For: 4.2.0


We encountered exception as below:

{quote}

2012-10-18 09:27:27,248 - DEBUG 
[CacheThread:BookkeeperPersistenceManager$RangeScanOp@247] - Issuing a bk read 
for ledger: L2 from entry-id: 100 to entry-id: 103
2012-10-18 09:27:27,248 - ERROR 
[CacheThread:BookkeeperPersistenceManager$RangeScanOp$2@261] - Error while 
reading from ledger: L2 for topic: TOPIC
org.apache.bookkeeper.client.BKException$BKReadException
        at org.apache.bookkeeper.client.BKException.create(BKException.java:48)
        at 
org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp$2.safeReadComplete(BookkeeperPersistenceManager.java:260)
        at 
org.apache.hedwig.zookeeper.SafeAsynBKCallback$ReadCallback.readComplete(SafeAsynBKCallback.java:61)
        at 
org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:380)
        at 
org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.read(BookkeeperPersistenceManager.java:252)
        at 
org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.startReadingFrom(BookkeeperPersistenceManager.java:327)
        at 
org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.runInternal(BookkeeperPersistenceManager.java:217)
        at 
org.apache.hedwig.server.common.TopicOpQueuer$SynchronousOp.run(TopicOpQueuer.java:77)
        at 
org.apache.hedwig.server.common.TopicOpQueuer.pushAndMaybeRun(TopicOpQueuer.java:105)
        at 
org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.scanMessages(BookkeeperPersistenceManager.java:336)
        at 
org.apache.hedwig.server.persistence.ReadAheadCache$ScanRequestWrapper.performRequest(ReadAheadCache.java:704)
        at 
org.apache.hedwig.server.persistence.ReadAheadCache.run(ReadAheadCache.java:291)
        at java.lang.Thread.run(Thread.java:662)

{quote}

topic TOPIC has 2 ledgers L1, L2, each ledger has 100 entries.

1) all the 100 entries in L1 has been delivered and consumed.
2) 100 entries have been wrote to L2 but not delivered.
3) L1 is deleted since all its entries have been consumed.
4) hub server shuts down
5) TOPIC recovered L2 and started delivering from 101.

TOPIC was expected to issue a read [0-3] from L2, but a read [100-103] was 
issued from the exception log, so no entries would be expected to read from L2 
at [100-103].

The problem of this issue is that we used 0 and 1 for the start of message id 
and ledger id even we had some consumed ledgers deleted.

{code}
        void processTopicLedgerRanges(final LedgerRanges ranges, final Version 
version) {
            Iterator<LedgerRange> lrIterator = 
ranges.getRangesList().iterator();
            TopicInfo topicInfo = new TopicInfo();

            long startOfLedger = 1;

            while (lrIterator.hasNext()) {
                LedgerRange range = lrIterator.next();

                if (range.hasEndSeqIdIncluded()) {
                    // this means it was a valid and completely closed ledger
                    long endOfLedger = 
range.getEndSeqIdIncluded().getLocalComponent();
                    topicInfo.ledgerRanges.put(endOfLedger, new 
InMemoryLedgerRange(range,           startOfLedger));                           
  startOfLedger = endOfLedger + 1;
                    continue;
                }        

                // If it doesn't have a valid end, it must be the last ledger
                if (lrIterator.hasNext()) {
                    String msg = "Ledger-id: " + range.getLedgerId() + " for 
topic: " + topic.       toStringUtf8()                                          
  + " is not the last one but still does not have an end seq-id";
                    logger.error(msg);
                    cb.operationFailed(ctx, new 
PubSubException.UnexpectedConditionException(msg));
                    return;                }

                // The last ledger does not have a valid seq-id, lets try to
                // find it out
                recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), 
version, topicInfo);
                return;
            }
{code}

{code}
                            long prevLedgerEnd = 
topicInfo.ledgerRanges.isEmpty() ? 0 : topicInfo.   ledgerRanges
                                                 .lastKey();
                            LedgerRange lr = 
LedgerRange.newBuilder().setLedgerId(ledgerId)
                                             
.setEndSeqIdIncluded(lastMessage.getMsgId()).build();
                            
topicInfo.ledgerRanges.put(lr.getEndSeqIdIncluded().getLocalComponent(),
                                    new InMemoryLedgerRange(lr, prevLedgerEnd + 
1, lh));
{code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to