[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13484987#comment-13484987
 ] 

Hudson commented on BOOKKEEPER-439:
-----------------------------------

Integrated in bookkeeper-trunk #776 (See 
[https://builds.apache.org/job/bookkeeper-trunk/776/])
    BOOKKEEPER-439: No more messages delivered after deleted consumed ledgers. 
(sijie via ivank) (Revision 1402501)

     Result = UNSTABLE
ivank : 
Files : 
* /zookeeper/bookkeeper/trunk/CHANGES.txt
* 
/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
* 
/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
* 
/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
* 
/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
* 
/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
* 
/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
* 
/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
* 
/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestBookKeeperPersistenceManager.java

                
> No more messages delivered after deleted consumed ledgers.
> ----------------------------------------------------------
>
>                 Key: BOOKKEEPER-439
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-439
>             Project: Bookkeeper
>          Issue Type: Bug
>          Components: hedwig-server
>    Affects Versions: 4.1.0, 4.2.0
>            Reporter: Sijie Guo
>            Assignee: Sijie Guo
>            Priority: Blocker
>             Fix For: 4.2.0
>
>         Attachments: BOOKKEEPER-439.diff, BOOKKEEPER-439.diff, 
> BOOKKEEPER-439.diff, BOOKKEEPER-439.diff, TopicMetadataAddStartSeqId.patch
>
>
> We encountered exception as below:
> {quote}
> 2012-10-18 09:27:27,248 - DEBUG 
> [CacheThread:BookkeeperPersistenceManager$RangeScanOp@247] - Issuing a bk 
> read for ledger: L2 from entry-id: 100 to entry-id: 103
> 2012-10-18 09:27:27,248 - ERROR 
> [CacheThread:BookkeeperPersistenceManager$RangeScanOp$2@261] - Error while 
> reading from ledger: L2 for topic: TOPIC
> org.apache.bookkeeper.client.BKException$BKReadException
>         at 
> org.apache.bookkeeper.client.BKException.create(BKException.java:48)
>         at 
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp$2.safeReadComplete(BookkeeperPersistenceManager.java:260)
>         at 
> org.apache.hedwig.zookeeper.SafeAsynBKCallback$ReadCallback.readComplete(SafeAsynBKCallback.java:61)
>         at 
> org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:380)
>         at 
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.read(BookkeeperPersistenceManager.java:252)
>         at 
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.startReadingFrom(BookkeeperPersistenceManager.java:327)
>         at 
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager$RangeScanOp.runInternal(BookkeeperPersistenceManager.java:217)
>         at 
> org.apache.hedwig.server.common.TopicOpQueuer$SynchronousOp.run(TopicOpQueuer.java:77)
>         at 
> org.apache.hedwig.server.common.TopicOpQueuer.pushAndMaybeRun(TopicOpQueuer.java:105)
>         at 
> org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.scanMessages(BookkeeperPersistenceManager.java:336)
>         at 
> org.apache.hedwig.server.persistence.ReadAheadCache$ScanRequestWrapper.performRequest(ReadAheadCache.java:704)
>         at 
> org.apache.hedwig.server.persistence.ReadAheadCache.run(ReadAheadCache.java:291)
>         at java.lang.Thread.run(Thread.java:662)
> {quote}
> topic TOPIC has 2 ledgers L1, L2, each ledger has 100 entries.
> 1) all the 100 entries in L1 has been delivered and consumed.
> 2) 100 entries have been wrote to L2 but not delivered.
> 3) L1 is deleted since all its entries have been consumed.
> 4) hub server shuts down
> 5) TOPIC recovered L2 and started delivering from 101.
> TOPIC was expected to issue a read [0-3] from L2, but a read [100-103] was 
> issued from the exception log, so no entries would be expected to read from 
> L2 at [100-103].
> The problem of this issue is that we used 0 and 1 for the start of message id 
> and ledger id even we had some consumed ledgers deleted.
> {code}
>         void processTopicLedgerRanges(final LedgerRanges ranges, final 
> Version version) {
>             Iterator<LedgerRange> lrIterator = 
> ranges.getRangesList().iterator();
>             TopicInfo topicInfo = new TopicInfo();
>             long startOfLedger = 1;
>             while (lrIterator.hasNext()) {
>                 LedgerRange range = lrIterator.next();
>                 if (range.hasEndSeqIdIncluded()) {
>                     // this means it was a valid and completely closed ledger
>                     long endOfLedger = 
> range.getEndSeqIdIncluded().getLocalComponent();
>                     topicInfo.ledgerRanges.put(endOfLedger, new 
> InMemoryLedgerRange(range,           startOfLedger));                         
>     startOfLedger = endOfLedger + 1;
>                     continue;
>                 }        
>                 // If it doesn't have a valid end, it must be the last ledger
>                 if (lrIterator.hasNext()) {
>                     String msg = "Ledger-id: " + range.getLedgerId() + " for 
> topic: " + topic.       toStringUtf8()                                        
>     + " is not the last one but still does not have an end seq-id";
>                     logger.error(msg);
>                     cb.operationFailed(ctx, new 
> PubSubException.UnexpectedConditionException(msg));
>                     return;                }
>                 // The last ledger does not have a valid seq-id, lets try to
>                 // find it out
>                 recoverLastTopicLedgerAndOpenNewOne(range.getLedgerId(), 
> version, topicInfo);
>                 return;
>             }
> {code}
> {code}
>                             long prevLedgerEnd = 
> topicInfo.ledgerRanges.isEmpty() ? 0 : topicInfo.   ledgerRanges
>                                                  .lastKey();
>                             LedgerRange lr = 
> LedgerRange.newBuilder().setLedgerId(ledgerId)
>                                              
> .setEndSeqIdIncluded(lastMessage.getMsgId()).build();
>                             
> topicInfo.ledgerRanges.put(lr.getEndSeqIdIncluded().getLocalComponent(),
>                                     new InMemoryLedgerRange(lr, prevLedgerEnd 
> + 1, lh));
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to