This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 44aaa47b5fd681e05a1fa19d201c9181a0d63864 Author: feynmanlin <[email protected]> AuthorDate: Thu Sep 30 21:45:19 2021 +0800 Optimize the memory usage of Cache Eviction (#12045) (cherry picked from commit 0c22e0fab596314bd5462f607ce6d03cb96ed484) --- .../mledger/impl/ManagedCursorContainer.java | 38 ++++++++++++++++---- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 30 ++++++++++------ .../mledger/impl/ManagedCursorContainerTest.java | 42 +++++++++++++++++++++- 3 files changed, 91 insertions(+), 19 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java index db887f0..848ce54 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java @@ -60,8 +60,23 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> { } } - // Used to keep track of slowest cursor. Contains all of all the cursors except for non-durable cursors - // Since we do need to keep track of non-durable cursors. + public enum CursorType { + DurableCursor, + NonDurableCursor, + ALL + } + + public ManagedCursorContainer() { + cursorType = CursorType.DurableCursor; + } + + public ManagedCursorContainer(CursorType cursorType) { + this.cursorType = cursorType; + } + + private final CursorType cursorType; + + // Used to keep track of slowest cursor. Contains all of all active cursors. private final ArrayList<Item> heap = Lists.newArrayList(); // Maps a cursor to its position in the heap @@ -76,8 +91,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> { Item item = new Item(cursor, heap.size()); cursors.put(cursor.getName(), item); - // don't need to add non-durable cursors - if (cursor.isDurable()) { + if (shouldTrackInHeap(cursor)) { heap.add(item); siftUp(item); } @@ -86,6 +100,16 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> { } } + private boolean shouldTrackInHeap(ManagedCursor cursor) { + return CursorType.ALL.equals(cursorType) + || (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType)) + || (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType)); + } + + public PositionImpl getSlowestReadPositionForActiveCursors() { + return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition(); + } + public ManagedCursor get(String name) { long stamp = rwLock.readLock(); try { @@ -101,7 +125,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> { try { Item item = cursors.remove(name); - if (item.cursor.isDurable()) { + if (shouldTrackInHeap(item.cursor)) { // Move the item to the right end of the heap to be removed Item lastItem = heap.get(heap.size() - 1); swap(item, lastItem); @@ -132,7 +156,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> { } - if (item.cursor.isDurable()) { + if (shouldTrackInHeap(item.cursor)) { PositionImpl previousSlowestConsumer = heap.get(0).position; // When the cursor moves forward, we need to push it toward the @@ -146,7 +170,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> { } PositionImpl newSlowestConsumer = heap.get(0).position; - return Pair.of(previousSlowestConsumer, newSlowestConsumer); + return Pair.of(previousSlowestConsumer, newSlowestConsumer); } return null; } finally { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 2a7f659..3419578 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -48,7 +48,6 @@ import java.util.Optional; import java.util.Queue; import java.util.Random; import java.util.Set; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -160,6 +159,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final ManagedCursorContainer cursors = new ManagedCursorContainer(); private final ManagedCursorContainer activeCursors = new ManagedCursorContainer(); + private final ManagedCursorContainer nonDurableActiveCursors = + new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor); // Ever increasing counter of entries added @VisibleForTesting @@ -2095,6 +2096,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } void doCacheEviction(long maxTimestamp) { + if (entryCache.getSize() <= 0) { + return; + } // Always remove all entries already read by active cursors PositionImpl slowestReaderPos = getEarlierReadPositionForActiveCursors(); if (slowestReaderPos != null) { @@ -2106,17 +2110,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } private PositionImpl getEarlierReadPositionForActiveCursors() { - PositionImpl smallest = null; - for (ManagedCursor cursor : activeCursors) { - PositionImpl p = (PositionImpl) cursor.getReadPosition(); - if (smallest == null) { - smallest = p; - } else if (p.compareTo(smallest) < 0) { - smallest = p; - } + PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestReadPositionForActiveCursors(); + PositionImpl durablePosition = activeCursors.getSlowestReadPositionForActiveCursors(); + if (nonDurablePosition == null) { + return durablePosition; } - - return smallest; + if (durablePosition == null) { + return nonDurablePosition; + } + return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition; } void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) { @@ -3362,6 +3364,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (activeCursors.get(cursor.getName()) == null) { activeCursors.add(cursor); } + if (!cursor.isDurable() && nonDurableActiveCursors.get(cursor.getName()) == null) { + nonDurableActiveCursors.add(cursor); + } } public void deactivateCursor(ManagedCursor cursor) { @@ -3378,6 +3383,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { getPreviousPosition((PositionImpl) activeCursors.getSlowestReader().getReadPosition())); } } + if (!cursor.isDurable()) { + nonDurableActiveCursors.removeCursor(cursor.getName()); + } } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 5ed8b29..6b9c009 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -18,12 +18,13 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.google.common.base.Predicate; import com.google.common.collect.Lists; import com.google.common.collect.Range; @@ -387,6 +388,45 @@ public class ManagedCursorContainerTest { } @Test + public void testSlowestReadPositionForActiveCursors() throws Exception { + ManagedCursorContainer container = + new ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor); + assertNull(container.getSlowestReadPositionForActiveCursors()); + + // Add no durable cursor + PositionImpl position = PositionImpl.get(5,5); + ManagedCursor cursor1 = spy(new MockManagedCursor(container, "test1", position)); + doReturn(false).when(cursor1).isDurable(); + doReturn(position).when(cursor1).getReadPosition(); + container.add(cursor1); + assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5)); + + // Add no durable cursor + position = PositionImpl.get(1,1); + ManagedCursor cursor2 = spy(new MockManagedCursor(container, "test2", position)); + doReturn(false).when(cursor2).isDurable(); + doReturn(position).when(cursor2).getReadPosition(); + container.add(cursor2); + assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(1, 1)); + + // Move forward cursor, cursor1 = 5:5 , cursor2 = 5:6, slowest is 5:5 + position = PositionImpl.get(5,6); + container.cursorUpdated(cursor2, position); + doReturn(position).when(cursor2).getReadPosition(); + assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 5)); + + // Move forward cursor, cursor1 = 5:8 , cursor2 = 5:6, slowest is 5:6 + position = PositionImpl.get(5,8); + doReturn(position).when(cursor1).getReadPosition(); + container.cursorUpdated(cursor1, position); + assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 6)); + + // Remove cursor, only cursor1 left, cursor1 = 5:8 + container.removeCursor(cursor2.getName()); + assertEquals(container.getSlowestReadPositionForActiveCursors(), new PositionImpl(5, 8)); + } + + @Test public void simple() throws Exception { ManagedCursorContainer container = new ManagedCursorContainer(); assertNull(container.getSlowestReaderPosition());
