hangc0276 opened a new pull request #8284: URL: https://github.com/apache/pulsar/pull/8284
Fix #8229 ### Bug Detail refer to broker dump file info: https://gist.github.com/lhotari/c34db2664b15fc32c9f0e7ae8b37dbda#gistcomment-3491707 We take the first row in the extracted table for example. ```shell cursor.markDeletePosition = 2273599:-1 opread readPosition = 2282101:0 opread nextReadPosition = 2282101:0 cursor readPosition = 2273599:0 cursor writePosition = 2273599:4 cursor.ledger.currentLedger.lastAddConfirmed = -1 cursor.ledger.currentLedger.ledgerId = 2282101 SQL: ((cursor.markDeletePosition.ledgerId.toString() + ":") + cursor.markDeletePosition.entryId.toString()) AS "cursor.markDeletePosition", ((readPosition.ledgerId.toString() + ":") + readPosition.entryId.toString()) AS "opread readPosition", ((nextReadPosition.ledgerId.toString() + ":") + nextReadPosition.entryId.toString()) AS "opread nextReadPosition", ((cursor.readPosition.ledgerId.toString() + ":") + cursor.readPosition.entryId.toString()) AS "cursor readPosition", ((cursor.ledger.lastConfirmedEntry.ledgerId.toString() + ":") + cursor.ledger.lastConfirmedEntry.entryId.toString()) AS "cursor writePosition", cursor.ledger.currentLedger.lastAddConfirmed, cursor.ledger.currentLedger.ledgerId.toString() AS "cursor.ledger.currentLedger.ledgerId" ``` When call `ManagedCursorImpl#asyncReadEntries`, cursor.readPosisition is **2273599:0**, however, when using cursor.readPostition to construct opReadEntry `OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx);`, it use the cursor.readPosition to construct op.readPostition `op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);`. Due to cursor.readPosition not exist in managedLedger ledgers map, `startReadOperationOnLedger` return the earliest available ledger position, and set op.readPosition to **2282101:0**, but the cursor.readPosition still **2273599:0**. When call `ManagedLedgerImpl#asyncReadEntries` according to the constructed opReadEntry, it call `ManagedLedgerImpl#internalReadFromLedger`. The key variables as follow ```shell ledger = 2282101:-1 lastPosition = 2273599:4 ledger.getId()[2282101] != lastPosition.getLedgerId() [2273599] firstEntry = op.readPosition.getEntryId() = 0 lastEntryInLedger = ledger.getLastAddConfirmed = -1 ``` Thus, it will go into the following branch ```java if (firstEntry > lastEntryInLedger) { if (log.isDebugEnabled()) { log.debug("[{}] No more messages to read from ledger={} lastEntry={} readEntry={}", name, ledger.getId(), lastEntryInLedger, firstEntry); } if (currentLedger == null || ledger.getId() != currentLedger.getId()) { // Cursor was placed past the end of one ledger, move it to the // beginning of the next ledger Long nextLedgerId = ledgers.ceilingKey(ledger.getId() + 1); if (nextLedgerId != null) { opReadEntry.updateReadPosition(new PositionImpl(nextLedgerId, 0)); } else { opReadEntry.updateReadPosition(new PositionImpl(ledger.getId() + 1, 0)); } } opReadEntry.checkReadCompletion(); return; } ``` Finally, it call `opReadEntry.checkReadCompletion()`, and then call `ManagedCursor#hasMoreEntries` to check whether has more entries to read. If `hasMoreEntries` returns true, it will setup another read thread to read more entries. ```java public boolean hasMoreEntries() { // If writer and reader are on the same ledger, we just need to compare the entry id to know if we have more // entries. // If they are on different ledgers we have 2 cases : // * Writer pointing to valid entry --> should return true since we have available entries // * Writer pointing to "invalid" entry -1 (meaning no entries in that ledger) --> Need to check if the reader // is // at the last entry in the previous ledger PositionImpl writerPosition = ledger.getLastPosition(); if (writerPosition.getEntryId() != -1) { return readPosition.compareTo(writerPosition) <= 0; } else { // Fall back to checking the number of entries to ensure we are at the last entry in ledger and no ledgers // are in the middle return getNumberOfEntries() > 0; } } ``` In `hasMoreEntries`, the key variables are `writerPosition` and `readPosition`. `writerPosition` is cursor.ledger.lastConfirmedEntry, which is `2273599:4` and `readPosition` is cursor.readPosition, which is `2273599:0`, thus, `hasMoreEntries` always return `true` and will fall into infinite loop and create a lot of read thread. The bug is op.readPosition not sync immediatly with cursor.readPosition. ### Changes 1. sync op.readPosition with cursor.readPosition before calling `checkReadCompletion`. @sijie @jiazhai @codelipenghui @lhotari Please help take a look, we may discuss in this pr, thanks. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
