This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c9651ecb23b [fix][broker]Fix mutex never released when trimming
(#17911)
c9651ecb23b is described below
commit c9651ecb23b07ec8a9a00a001da222b479fb78c3
Author: feynmanlin <[email protected]>
AuthorDate: Tue Oct 11 10:31:41 2022 +0800
[fix][broker]Fix mutex never released when trimming (#17911)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 17 +++++++++--
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 34 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 3 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ad999472bbc..729cec2f31c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2585,7 +2585,14 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return;
}
- advanceCursorsIfNecessary(ledgersToDelete);
+ try {
+ advanceCursorsIfNecessary(ledgersToDelete);
+ } catch (LedgerNotExistException e) {
+ log.info("First non deleted Ledger is not found, stop
trimming");
+ metadataMutex.unlock();
+ trimmerMutex.unlock();
+ return;
+ }
PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
@@ -2658,7 +2665,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
* This is to make sure that the `consumedEntries` counter is correctly
updated with the number of skipped
* entries and the stats are reported correctly.
*/
- private void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) {
+ @VisibleForTesting
+ void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) throws
LedgerNotExistException {
if (ledgersToDelete.isEmpty()) {
return;
}
@@ -2666,7 +2674,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// need to move mark delete for non-durable cursors to the first
ledger NOT marked for deletion
// calling getNumberOfEntries latter for a ledger that is already
deleted will be problematic and return
// incorrect results
- long firstNonDeletedLedger =
ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() -
1).getLedgerId());
+ Long firstNonDeletedLedger =
ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() -
1).getLedgerId());
+ if (firstNonDeletedLedger == null) {
+ throw new LedgerNotExistException("First non deleted Ledger is not
found");
+ }
PositionImpl highestPositionToDelete = new
PositionImpl(firstNonDeletedLedger, -1);
cursors.forEach(cursor -> {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index a07a84f70bd..0c3930f20e6 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -22,9 +22,11 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -3519,6 +3521,38 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ledger.close();
}
+ @Test
+ public void testLockReleaseWhenTrimLedger() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(1);
+
+ ManagedLedgerImpl ledger =
spy((ManagedLedgerImpl)factory.open("testLockReleaseWhenTrimLedger", config));
+ doThrow(new ManagedLedgerException.LedgerNotExistException("First non
deleted Ledger is not found"))
+ .when(ledger).advanceCursorsIfNecessary(any());
+ final int entries = 10;
+ ManagedCursor cursor = ledger.openCursor("test-cursor" +
UUID.randomUUID());
+ for (int i = 0; i < entries; i++) {
+ ledger.addEntry(String.valueOf(i).getBytes(Encoding));
+ }
+ List<Entry> entryList = cursor.readEntries(entries);
+ assertEquals(entryList.size(), entries);
+ assertEquals(ledger.ledgers.size(), entries);
+ assertEquals(ledger.ledgerCache.size(), entries - 1);
+ cursor.clearBacklog();
+ ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+ ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+ // Cleanup fails because ManagedLedgerNotFoundException is thrown
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ledger.ledgers.size(), entries);
+ assertEquals(ledger.ledgerCache.size(), entries - 1);
+ });
+ // The lock is released even if an ManagedLedgerNotFoundException
occurs, so it can be called repeatedly
+ Awaitility.await().untilAsserted(() ->
+ verify(ledger, atLeast(2)).advanceCursorsIfNecessary(any()));
+ cursor.close();
+ ledger.close();
+ }
+
@Test
public void testInvalidateReadHandleWhenConsumed() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();