This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7c37e5692a78e2aec9fc3d436f4f7adf5dd488a0 Author: Yunze Xu <[email protected]> AuthorDate: Thu Jul 7 21:27:13 2022 +0800 [improve][broker] Recycle OpReadEntry in some corner cases (#16399) ### Motivation `ManagedCursorImpl` maintains a field `waitingReadOp` as the cache of the `OpReadEntry` created in `asyncReadEntriesOrWait` when there are no more entries to read. However, there are two cases that the created `OpReadEntry` are not recycled: 1. `asyncReadEntriesOrWait` is called repeatedly when `waitingReadOp` is not null and there are no more entries. The new created `OpReadEntry` cannot pass the CAS check but it's not recycled. 2. `cancelPendingReadRequest` is called. The `waitingReadOp` is just set with null and the previous reference is not recycled. ### Modifications For the two cases described above, recycle the `OpReadEntry` objects. ### Verifying this change Add `testOpReadEntryRecycle` to reproduce the corner cases and verify the count of `recycle()` calls. (cherry picked from commit 6cec62e89629c258f9458a726ff8ba3b644788b7) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 7 ++- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 62 ++++++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 93ec27f2dc8..87850624751 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -858,6 +858,7 @@ public class ManagedCursorImpl implements ManagedCursor { ctx, maxPosition); if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) { + op.recycle(); callback.readEntriesFailed(new ManagedLedgerException("We can only have a single waiting callback"), ctx); return; @@ -940,7 +941,11 @@ public class ManagedCursorImpl implements ManagedCursor { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Cancel pending read request", ledger.getName(), name); } - return WAITING_READ_OP_UPDATER.getAndSet(this, null) != null; + final OpReadEntry op = WAITING_READ_OP_UPDATER.getAndSet(this, null); + if (op != null) { + op.recycle(); + } + return op != null; } public boolean hasPendingReadRequest() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index d64cac6bbd3..8944ba57a56 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.eq; @@ -42,10 +43,12 @@ import java.util.Arrays; import java.util.BitSet; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -57,6 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.stream.Collectors; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -93,6 +97,8 @@ import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.common.api.proto.IntRange; import org.awaitility.Awaitility; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -3752,5 +3758,61 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { Awaitility.await().untilAsserted(() -> assertTrue(flag.get())); } + @Test + public void testOpReadEntryRecycle() throws Exception { + final Map<OpReadEntry, AtomicInteger> opReadEntryToRecycleCount = new ConcurrentHashMap<>(); + final Supplier<OpReadEntry> createOpReadEntry = () -> { + final OpReadEntry mockedOpReadEntry = mock(OpReadEntry.class); + doAnswer(__ -> opReadEntryToRecycleCount.computeIfAbsent(mockedOpReadEntry, + ignored -> new AtomicInteger(0)).getAndIncrement() + ).when(mockedOpReadEntry).recycle(); + return mockedOpReadEntry; + }; + + @Cleanup final MockedStatic<OpReadEntry> mockedStaticOpReadEntry = Mockito.mockStatic(OpReadEntry.class); + mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), anyInt(), any(), any(), any())) + .thenAnswer(__ -> createOpReadEntry.get()); + + final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig(); + ledgerConfig.setNewEntriesCheckDelayInMillis(10); + final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", ledgerConfig); + final ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("my_cursor"); + final List<ManagedLedgerException> exceptions = new ArrayList<>(); + final AtomicBoolean readEntriesSuccess = new AtomicBoolean(false); + final ReadEntriesCallback callback = new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List<Entry> entries, Object ctx) { + readEntriesSuccess.set(true); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + exceptions.add(exception); + } + }; + + final int numReadRequests = 3; + for (int i = 0; i < numReadRequests; i++) { + cursor.asyncReadEntriesOrWait(1, callback, null, new PositionImpl(0, 0)); + } + Awaitility.await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> assertEquals(ledger.waitingCursors.size(), 1)); + assertTrue(cursor.cancelPendingReadRequest()); + + ledger.addEntry(new byte[1]); + Awaitility.await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> assertTrue(ledger.waitingCursors.isEmpty())); + assertFalse(readEntriesSuccess.get()); + + assertEquals(exceptions.size(), numReadRequests - 1); + exceptions.forEach(e -> assertEquals(e.getMessage(), "We can only have a single waiting callback")); + assertEquals(opReadEntryToRecycleCount.size(), 3); + assertEquals(opReadEntryToRecycleCount.entrySet().stream() + .map(Map.Entry::getValue) + .map(AtomicInteger::get) + .collect(Collectors.toList()), + Arrays.asList(1, 1, 1)); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); }
