codelipenghui commented on code in PR #17273:
URL: https://github.com/apache/pulsar/pull/17273#discussion_r956049237


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2182,63 +2180,33 @@ public boolean hasMoreEntries(PositionImpl position) {
         return result;
     }
 
-    void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl 
newPosition) {
-        Pair<PositionImpl, PositionImpl> pair = 
activeCursors.cursorUpdated(cursor, newPosition);
-        if (pair != null) {
-            entryCache.invalidateEntries(pair.getRight());
+    void doCacheEviction(long maxTimestamp) {
+        if (entryCache.getSize() > 0) {
+            entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);

Review Comment:
   Looks like we should call `invalidateEntriesUpToSlowestReaderPosition` 
before this line. I noticed the previous behavior for cache eviction by 
timestamp is to try to evict based on the slowest read position first, then by 
the timestamp. Actually, I don't see much difference, it looks like evicting 
based on position is more efficient? So try to evict based on the slowest 
position as much as possible.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2182,63 +2180,33 @@ public boolean hasMoreEntries(PositionImpl position) {
         return result;
     }
 
-    void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl 
newPosition) {
-        Pair<PositionImpl, PositionImpl> pair = 
activeCursors.cursorUpdated(cursor, newPosition);
-        if (pair != null) {
-            entryCache.invalidateEntries(pair.getRight());
+    void doCacheEviction(long maxTimestamp) {
+        if (entryCache.getSize() > 0) {
+            entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);
         }
     }
 
-    public PositionImpl getEvictionPosition(){
-        PositionImpl evictionPos;
-        if (config.isCacheEvictionByMarkDeletedPosition()) {
-            PositionImpl earlierMarkDeletedPosition = 
getEarlierMarkDeletedPositionForActiveCursors();
-            evictionPos = earlierMarkDeletedPosition != null ? 
earlierMarkDeletedPosition.getNext() : null;
-        } else {
-            // Always remove all entries already read by active cursors
-            evictionPos = getEarlierReadPositionForActiveCursors();
-        }
-        return evictionPos;
-    }
-    void doCacheEviction(long maxTimestamp) {
+    // slowest reader position is earliest mark delete position when 
cacheEvictionByMarkDeletedPosition=true
+    // it is the earliest read position when 
cacheEvictionByMarkDeletedPosition=false
+    private void invalidateEntriesUpToSlowestReaderPosition() {
         if (entryCache.getSize() <= 0) {
             return;
         }
-        PositionImpl evictionPos = getEvictionPosition();
-        if (evictionPos != null) {
-            entryCache.invalidateEntries(evictionPos);
-        }
-
-        // Remove entries older than the cutoff threshold
-        entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);
-    }
-
-    private PositionImpl getEarlierReadPositionForActiveCursors() {
-        PositionImpl nonDurablePosition = 
nonDurableActiveCursors.getSlowestReadPositionForActiveCursors();
-        PositionImpl durablePosition = 
activeCursors.getSlowestReadPositionForActiveCursors();
-        if (nonDurablePosition == null) {
-            return durablePosition;
-        }
-        if (durablePosition == null) {
-            return nonDurablePosition;
-        }
-        return durablePosition.compareTo(nonDurablePosition) > 0 ? 
nonDurablePosition : durablePosition;
-    }
-
-    private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() {
-        PositionImpl nonDurablePosition = 
nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors();
-        PositionImpl durablePosition = 
activeCursors.getSlowestMarkDeletedPositionForActiveCursors();
-        if (nonDurablePosition == null) {
-            return durablePosition;
-        }
-        if (durablePosition == null) {
-            return nonDurablePosition;
+        if (!activeCursors.isEmpty()) {
+            PositionImpl evictionPos = 
activeCursors.getSlowestReaderPosition();
+            if (evictionPos != null) {
+                entryCache.invalidateEntries(evictionPos);
+            }
+        } else {
+            entryCache.clear();
         }
-        return durablePosition.compareTo(nonDurablePosition) > 0 ? 
nonDurablePosition : durablePosition;
     }
 
     void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
         Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated(cursor, 
newPosition);
+        if (config.isCacheEvictionByMarkDeletedPosition()) {
+            updateActiveCursor(cursor, newPosition);
+        }

Review Comment:
   Looks like it's better to change this method to 
`onCursorMarkDeletePositionUpdated()` and change `updateReadPosition` to 
`onCursorReadPositionUpdated()` which will make the code reading easily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to