[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13484087#comment-13484087
 ] 

Ivan Kelly commented on BOOKKEEPER-439:
---------------------------------------

Why are you changing the return code for delete?

You never close the ledger opened in getStartSeqIdToProcessTopicLedgerRanges()
You never close the ledger in recoverLastTopicLedgerAndOpenNewOne()

in getStartSeqIdToProcessTopicLedgerRanges, for
{code}
if (!range.hasEndSeqIdIncluded()) {
    // process topic ledger ranges directly
    processTopicLedgerRanges(rangesList, version, START_SEQ_ID);
    return;
}
{code}
add a comment that, as the ledger will be recovered, the start seq id will be 
discovered at that point.

for the NoSuchLedgerExistsException case in 
getStartSeqIdToProcessTopicLedgerRanges(),
shouldn't we try the next ledger in this case? what if the next ledger is also 
missing the start id?

In UpdateLedgerOp#run, doRemove is a confusing name, I expected it to be used 
later on to decide whether the update was done, and it was only when i looked 
at the later code that i realised that keysToRemove.empty() was what was used. 
I would rename it to foundUnconsumedLedger.

It would be good to have a backwards compat test for this.
                
> No more messages delivered after deleted consumed ledgers.
> ----------------------------------------------------------
>
>                 Key: BOOKKEEPER-439
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-439
>             Project: Bookkeeper
>          Issue Type: Bug
>          Components: hedwig-server
>    Affects Versions: 4.1.0, 4.2.0
>            Reporter: Sijie Guo
>            Assignee: Sijie Guo
>            Priority: Blocker
>             Fix For: 4.2.0
>
>         Attachments: BOOKKEEPER-439.diff, BOOKKEEPER-439.diff, 
> BOOKKEEPER-439.diff, TopicMetadataAddStartSeqId.patch
>
>
> We encountered exception as below:
> {quote}
> 2012-10-18 09:27:27,248 - DEBUG 
> [CacheThread:BookkeeperPersistenceManager$RangeScanOp@247] - Issuing a bk 
> read for ledger: L2 from entry-id: 100 to entry-id: 103
> 2012-10-18 09:27:27,248 - ERROR 
> [CacheThread:BookkeeperPersistenceManager$RangeScanOp$2@261] - Error while 
> reading from ledger: L2 for topic: TOPIC
> org.apache.bookkeeper.client.BKException$BKReadException
>         at 
> org.apache.bookkeeper.client.BKException.create(BKException.java:48)
>         at 
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp$2.safeReadComplete(BookkeeperPersistenceManager.java:260)
>         at 
> org.apache.hedwig.zookeeper.SafeAsynBKCallback$ReadCallback.readComplete(SafeAsynBKCallback.java:61)
>         at 
> org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:380)
>         at 
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.read(BookkeeperPersistenceManager.java:252)
>         at 
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.startReadingFrom(BookkeeperPersistenceManager.java:327)
>         at 
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.runInternal(BookkeeperPersistenceManager.java:217)
>         at 
> org.apache.hedwig.server.common.TopicOpQueuer$SynchronousOp.run(TopicOpQueuer.java:77)
>         at 
> org.apache.hedwig.server.common.TopicOpQueuer.pushAndMaybeRun(TopicOpQueuer.java:105)
>         at 
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.scanMessages(BookkeeperPersistenceManager.java:336)
>         at 
> org.apache.hedwig.server.persistence.ReadAheadCache$ScanRequestWrapper.performRequest(ReadAheadCache.java:704)
>         at 
> org.apache.hedwig.server.persistence.ReadAheadCache.run(ReadAheadCache.java:291)
>         at java.lang.Thread.run(Thread.java:662)
> {quote}
> topic TOPIC has 2 ledgers L1, L2, each ledger has 100 entries.
> 1) all the 100 entries in L1 has been delivered and consumed.
> 2) 100 entries have been wrote to L2 but not delivered.
> 3) L1 is deleted since all its entries have been consumed.
> 4) hub server shuts down
> 5) TOPIC recovered L2 and started delivering from 101.
> TOPIC was expected to issue a read [0-3] from L2, but a read [100-103] was 
> issued from the exception log, so no entries would be expected to read from 
> L2 at [100-103].
> The problem of this issue is that we used 0 and 1 for the start of message id 
> and ledger id even we had some consumed ledgers deleted.
> {code}
>         void processTopicLedgerRanges(final LedgerRanges ranges, final 
> Version version) {
>             Iterator<LedgerRange> lrIterator = 
> ranges.getRangesList().iterator();
>             TopicInfo topicInfo = new TopicInfo();
>             long startOfLedger = 1;
>             while (lrIterator.hasNext()) {
>                 LedgerRange range = lrIterator.next();
>                 if (range.hasEndSeqIdIncluded()) {
>                     // this means it was a valid and completely closed ledger
>                     long endOfLedger = 
> range.getEndSeqIdIncluded().getLocalComponent();
>                     topicInfo.ledgerRanges.put(endOfLedger, new 
> InMemoryLedgerRange(range,           startOfLedger));                         
>     startOfLedger = endOfLedger + 1;
>                     continue;
>                 }        
>                 // If it doesn't have a valid end, it must be the last ledger
>                 if (lrIterator.hasNext()) {
>                     String msg = "Ledger-id: " + range.getLedgerId() + " for 
> topic: " + topic.       toStringUtf8()                                        
>     + " is not the last one but still does not have an end seq-id";
>                     logger.error(msg);
>                     cb.operationFailed(ctx, new 
> PubSubException.UnexpectedConditionException(msg));
>                     return;                }
>                 // The last ledger does not have a valid seq-id, lets try to
>                 // find it out
>                 recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), 
> version, topicInfo);
>                 return;
>             }
> {code}
> {code}
>                             long prevLedgerEnd = 
> topicInfo.ledgerRanges.isEmpty() ? 0 : topicInfo.   ledgerRanges
>                                                  .lastKey();
>                             LedgerRange lr = 
> LedgerRange.newBuilder().setLedgerId(ledgerId)
>                                              
> .setEndSeqIdIncluded(lastMessage.getMsgId()).build();
>                             
> topicInfo.ledgerRanges.put(lr.getEndSeqIdIncluded().getLocalComponent(),
>                                     new InMemoryLedgerRange(lr, prevLedgerEnd 
> + 1, lh));
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to