hangc0276 opened a new pull request #8284:
URL: https://github.com/apache/pulsar/pull/8284


   Fix #8229 
   
   ### Bug Detail
   refer to broker dump file info: 
https://gist.github.com/lhotari/c34db2664b15fc32c9f0e7ae8b37dbda#gistcomment-3491707
   
   We take the first row in the extracted table for example.
   
   ```shell
   cursor.markDeletePosition = 2273599:-1
   opread readPosition = 2282101:0
   opread nextReadPosition = 2282101:0
   cursor readPosition = 2273599:0
   cursor writePosition = 2273599:4
   cursor.ledger.currentLedger.lastAddConfirmed = -1
   cursor.ledger.currentLedger.ledgerId = 2282101
   
   SQL:
   ((cursor.markDeletePosition.ledgerId.toString() + ":") + 
cursor.markDeletePosition.entryId.toString()) AS "cursor.markDeletePosition", 
((readPosition.ledgerId.toString() + ":") + readPosition.entryId.toString()) AS 
"opread readPosition", 
   ((nextReadPosition.ledgerId.toString() + ":") + 
nextReadPosition.entryId.toString()) AS "opread nextReadPosition",
    ((cursor.readPosition.ledgerId.toString() + ":") + 
cursor.readPosition.entryId.toString()) AS "cursor readPosition", 
   ((cursor.ledger.lastConfirmedEntry.ledgerId.toString() + ":") + 
cursor.ledger.lastConfirmedEntry.entryId.toString()) AS "cursor writePosition", 
   cursor.ledger.currentLedger.lastAddConfirmed, 
cursor.ledger.currentLedger.ledgerId.toString() AS
    "cursor.ledger.currentLedger.ledgerId"
   ```
   When call `ManagedCursorImpl#asyncReadEntries`, cursor.readPosisition is 
**2273599:0**, however, when using cursor.readPostition to construct 
opReadEntry `OpReadEntry op = OpReadEntry.create(this, readPosition, 
numberOfEntriesToRead, callback, ctx);`, it use the cursor.readPosition to 
construct op.readPostition `op.readPosition = 
cursor.ledger.startReadOperationOnLedger(readPositionRef, op);`. Due to 
cursor.readPosition not exist in managedLedger ledgers map, 
`startReadOperationOnLedger` return the earliest available ledger position, and 
set op.readPosition to **2282101:0**, but the cursor.readPosition still 
**2273599:0**.
   
   When call `ManagedLedgerImpl#asyncReadEntries` according to the constructed 
opReadEntry, it call `ManagedLedgerImpl#internalReadFromLedger`. The key 
variables as follow
   ```shell
   ledger = 2282101:-1
   lastPosition = 2273599:4
   ledger.getId()[2282101] != lastPosition.getLedgerId() [2273599]
   firstEntry = op.readPosition.getEntryId() = 0
   lastEntryInLedger = ledger.getLastAddConfirmed = -1
   ```
   Thus, it will go into the following branch
   ```java
   if (firstEntry > lastEntryInLedger) {
               if (log.isDebugEnabled()) {
                   log.debug("[{}] No more messages to read from ledger={} 
lastEntry={} readEntry={}", name,
                           ledger.getId(), lastEntryInLedger, firstEntry);
               }
   
               if (currentLedger == null || ledger.getId() != 
currentLedger.getId()) {
                   // Cursor was placed past the end of one ledger, move it to 
the
                   // beginning of the next ledger
                   Long nextLedgerId = ledgers.ceilingKey(ledger.getId() + 1);
                   if (nextLedgerId != null) {
                       opReadEntry.updateReadPosition(new 
PositionImpl(nextLedgerId, 0));
                   } else {
                       opReadEntry.updateReadPosition(new 
PositionImpl(ledger.getId() + 1, 0));
                   }
               }
   
               opReadEntry.checkReadCompletion();
               return;
           }
   ```
   Finally, it call `opReadEntry.checkReadCompletion()`, and then call 
`ManagedCursor#hasMoreEntries` to check whether has more entries to read. If 
`hasMoreEntries` returns true, it will setup another read thread to read more 
entries.
   
   ```java
   public boolean hasMoreEntries() {
           // If writer and reader are on the same ledger, we just need to 
compare the entry id to know if we have more
           // entries.
           // If they are on different ledgers we have 2 cases :
           // * Writer pointing to valid entry --> should return true since we 
have available entries
           // * Writer pointing to "invalid" entry -1 (meaning no entries in 
that ledger) --> Need to check if the reader
           // is
           // at the last entry in the previous ledger
           PositionImpl writerPosition = ledger.getLastPosition();
           if (writerPosition.getEntryId() != -1) {
               return readPosition.compareTo(writerPosition) <= 0;
           } else {
               // Fall back to checking the number of entries to ensure we are 
at the last entry in ledger and no ledgers
               // are in the middle
               return getNumberOfEntries() > 0;
           }
       }
   ```
   In `hasMoreEntries`, the key variables are `writerPosition` and 
`readPosition`. `writerPosition` is cursor.ledger.lastConfirmedEntry, which is 
`2273599:4` and `readPosition` is cursor.readPosition, which is `2273599:0`, 
thus, `hasMoreEntries` always return `true` and will fall into infinite loop 
and create a lot of read thread.
   
   The bug is op.readPosition not sync immediatly with cursor.readPosition.
   
   ### Changes
   1. sync op.readPosition with cursor.readPosition before calling 
`checkReadCompletion`.
   
   @sijie @jiazhai @codelipenghui @lhotari  Please help take a look, we may 
discuss in this pr, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to