This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 44aaa47b5fd681e05a1fa19d201c9181a0d63864
Author: feynmanlin <[email protected]>
AuthorDate: Thu Sep 30 21:45:19 2021 +0800

    Optimize the memory usage of Cache Eviction (#12045)
    
    
    (cherry picked from commit 0c22e0fab596314bd5462f607ce6d03cb96ed484)
---
 .../mledger/impl/ManagedCursorContainer.java       | 38 ++++++++++++++++----
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 30 ++++++++++------
 .../mledger/impl/ManagedCursorContainerTest.java   | 42 +++++++++++++++++++++-
 3 files changed, 91 insertions(+), 19 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index db887f0..848ce54 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -60,8 +60,23 @@ public class ManagedCursorContainer implements 
Iterable<ManagedCursor> {
         }
     }
 
-    // Used to keep track of slowest cursor. Contains all of all the cursors 
except for non-durable cursors
-    // Since we do need to keep track of non-durable cursors.
+    public enum CursorType {
+        DurableCursor,
+        NonDurableCursor,
+        ALL
+    }
+
+    public ManagedCursorContainer() {
+        cursorType = CursorType.DurableCursor;
+    }
+
+    public ManagedCursorContainer(CursorType cursorType) {
+        this.cursorType = cursorType;
+    }
+
+    private final CursorType cursorType;
+
+    // Used to keep track of slowest cursor. Contains all of all active 
cursors.
     private final ArrayList<Item> heap = Lists.newArrayList();
 
     // Maps a cursor to its position in the heap
@@ -76,8 +91,7 @@ public class ManagedCursorContainer implements 
Iterable<ManagedCursor> {
             Item item = new Item(cursor, heap.size());
             cursors.put(cursor.getName(), item);
 
-            // don't need to add non-durable cursors
-            if (cursor.isDurable()) {
+            if (shouldTrackInHeap(cursor)) {
                 heap.add(item);
                 siftUp(item);
             }
@@ -86,6 +100,16 @@ public class ManagedCursorContainer implements 
Iterable<ManagedCursor> {
         }
     }
 
+    private boolean shouldTrackInHeap(ManagedCursor cursor) {
+        return CursorType.ALL.equals(cursorType)
+                || (cursor.isDurable() && 
CursorType.DurableCursor.equals(cursorType))
+                || (!cursor.isDurable() && 
CursorType.NonDurableCursor.equals(cursorType));
+    }
+
+    public PositionImpl getSlowestReadPositionForActiveCursors() {
+        return heap.isEmpty() ? null : (PositionImpl) 
heap.get(0).cursor.getReadPosition();
+    }
+
     public ManagedCursor get(String name) {
         long stamp = rwLock.readLock();
         try {
@@ -101,7 +125,7 @@ public class ManagedCursorContainer implements 
Iterable<ManagedCursor> {
         try {
             Item item = cursors.remove(name);
 
-            if (item.cursor.isDurable()) {
+            if (shouldTrackInHeap(item.cursor)) {
                 // Move the item to the right end of the heap to be removed
                 Item lastItem = heap.get(heap.size() - 1);
                 swap(item, lastItem);
@@ -132,7 +156,7 @@ public class ManagedCursorContainer implements 
Iterable<ManagedCursor> {
             }
 
 
-            if (item.cursor.isDurable()) {
+            if (shouldTrackInHeap(item.cursor)) {
                 PositionImpl previousSlowestConsumer = heap.get(0).position;
 
                 // When the cursor moves forward, we need to push it toward the
@@ -146,7 +170,7 @@ public class ManagedCursorContainer implements 
Iterable<ManagedCursor> {
                 }
 
                 PositionImpl newSlowestConsumer = heap.get(0).position;
-                    return Pair.of(previousSlowestConsumer, 
newSlowestConsumer);
+                return Pair.of(previousSlowestConsumer, newSlowestConsumer);
             }
             return null;
         } finally {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 2a7f659..3419578 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -48,7 +48,6 @@ import java.util.Optional;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -160,6 +159,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     private final ManagedCursorContainer cursors = new 
ManagedCursorContainer();
     private final ManagedCursorContainer activeCursors = new 
ManagedCursorContainer();
+    private final ManagedCursorContainer nonDurableActiveCursors =
+            new 
ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
 
     // Ever increasing counter of entries added
     @VisibleForTesting
@@ -2095,6 +2096,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     }
 
     void doCacheEviction(long maxTimestamp) {
+        if (entryCache.getSize() <= 0) {
+            return;
+        }
         // Always remove all entries already read by active cursors
         PositionImpl slowestReaderPos = 
getEarlierReadPositionForActiveCursors();
         if (slowestReaderPos != null) {
@@ -2106,17 +2110,15 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
     }
 
     private PositionImpl getEarlierReadPositionForActiveCursors() {
-        PositionImpl smallest = null;
-        for (ManagedCursor cursor : activeCursors) {
-            PositionImpl p = (PositionImpl) cursor.getReadPosition();
-            if (smallest == null) {
-                smallest = p;
-            } else if (p.compareTo(smallest) < 0) {
-                smallest = p;
-            }
+        PositionImpl nonDurablePosition = 
nonDurableActiveCursors.getSlowestReadPositionForActiveCursors();
+        PositionImpl durablePosition = 
activeCursors.getSlowestReadPositionForActiveCursors();
+        if (nonDurablePosition == null) {
+            return durablePosition;
         }
-
-        return smallest;
+        if (durablePosition == null) {
+            return nonDurablePosition;
+        }
+        return durablePosition.compareTo(nonDurablePosition) > 0 ? 
nonDurablePosition : durablePosition;
     }
 
     void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
@@ -3362,6 +3364,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         if (activeCursors.get(cursor.getName()) == null) {
             activeCursors.add(cursor);
         }
+        if (!cursor.isDurable() && 
nonDurableActiveCursors.get(cursor.getName()) == null) {
+            nonDurableActiveCursors.add(cursor);
+        }
     }
 
     public void deactivateCursor(ManagedCursor cursor) {
@@ -3378,6 +3383,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                             getPreviousPosition((PositionImpl) 
activeCursors.getSlowestReader().getReadPosition()));
                 }
             }
+            if (!cursor.isDurable()) {
+                nonDurableActiveCursors.removeCursor(cursor.getName());
+            }
         }
     }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 5ed8b29..6b9c009 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -18,12 +18,13 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.base.Predicate;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
@@ -387,6 +388,45 @@ public class ManagedCursorContainerTest {
     }
 
     @Test
+    public void testSlowestReadPositionForActiveCursors() throws Exception {
+        ManagedCursorContainer container =
+                new 
ManagedCursorContainer(ManagedCursorContainer.CursorType.NonDurableCursor);
+        assertNull(container.getSlowestReadPositionForActiveCursors());
+
+        // Add no durable cursor
+        PositionImpl position = PositionImpl.get(5,5);
+        ManagedCursor cursor1 = spy(new MockManagedCursor(container, "test1", 
position));
+        doReturn(false).when(cursor1).isDurable();
+        doReturn(position).when(cursor1).getReadPosition();
+        container.add(cursor1);
+        assertEquals(container.getSlowestReadPositionForActiveCursors(), new 
PositionImpl(5, 5));
+
+        // Add no durable cursor
+        position = PositionImpl.get(1,1);
+        ManagedCursor cursor2 = spy(new MockManagedCursor(container, "test2", 
position));
+        doReturn(false).when(cursor2).isDurable();
+        doReturn(position).when(cursor2).getReadPosition();
+        container.add(cursor2);
+        assertEquals(container.getSlowestReadPositionForActiveCursors(), new 
PositionImpl(1, 1));
+
+        // Move forward cursor, cursor1 = 5:5 , cursor2 = 5:6, slowest is 5:5
+        position = PositionImpl.get(5,6);
+        container.cursorUpdated(cursor2, position);
+        doReturn(position).when(cursor2).getReadPosition();
+        assertEquals(container.getSlowestReadPositionForActiveCursors(), new 
PositionImpl(5, 5));
+
+        // Move forward cursor, cursor1 = 5:8 , cursor2 = 5:6, slowest is 5:6
+        position = PositionImpl.get(5,8);
+        doReturn(position).when(cursor1).getReadPosition();
+        container.cursorUpdated(cursor1, position);
+        assertEquals(container.getSlowestReadPositionForActiveCursors(), new 
PositionImpl(5, 6));
+
+        // Remove cursor, only cursor1 left, cursor1 = 5:8
+        container.removeCursor(cursor2.getName());
+        assertEquals(container.getSlowestReadPositionForActiveCursors(), new 
PositionImpl(5, 8));
+    }
+
+    @Test
     public void simple() throws Exception {
         ManagedCursorContainer container = new ManagedCursorContainer();
         assertNull(container.getSlowestReaderPosition());

Reply via email to