lhotari commented on code in PR #17273:
URL: https://github.com/apache/pulsar/pull/17273#discussion_r956114604
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2182,63 +2180,33 @@ public boolean hasMoreEntries(PositionImpl position) {
return result;
}
- void discardEntriesFromCache(ManagedCursorImpl cursor, PositionImpl
newPosition) {
- Pair<PositionImpl, PositionImpl> pair =
activeCursors.cursorUpdated(cursor, newPosition);
- if (pair != null) {
- entryCache.invalidateEntries(pair.getRight());
+ void doCacheEviction(long maxTimestamp) {
+ if (entryCache.getSize() > 0) {
+ entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);
}
}
- public PositionImpl getEvictionPosition(){
- PositionImpl evictionPos;
- if (config.isCacheEvictionByMarkDeletedPosition()) {
- PositionImpl earlierMarkDeletedPosition =
getEarlierMarkDeletedPositionForActiveCursors();
- evictionPos = earlierMarkDeletedPosition != null ?
earlierMarkDeletedPosition.getNext() : null;
- } else {
- // Always remove all entries already read by active cursors
- evictionPos = getEarlierReadPositionForActiveCursors();
- }
- return evictionPos;
- }
- void doCacheEviction(long maxTimestamp) {
+ // slowest reader position is earliest mark delete position when
cacheEvictionByMarkDeletedPosition=true
+ // it is the earliest read position when
cacheEvictionByMarkDeletedPosition=false
+ private void invalidateEntriesUpToSlowestReaderPosition() {
if (entryCache.getSize() <= 0) {
return;
}
- PositionImpl evictionPos = getEvictionPosition();
- if (evictionPos != null) {
- entryCache.invalidateEntries(evictionPos);
- }
-
- // Remove entries older than the cutoff threshold
- entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);
- }
-
- private PositionImpl getEarlierReadPositionForActiveCursors() {
- PositionImpl nonDurablePosition =
nonDurableActiveCursors.getSlowestReadPositionForActiveCursors();
- PositionImpl durablePosition =
activeCursors.getSlowestReadPositionForActiveCursors();
- if (nonDurablePosition == null) {
- return durablePosition;
- }
- if (durablePosition == null) {
- return nonDurablePosition;
- }
- return durablePosition.compareTo(nonDurablePosition) > 0 ?
nonDurablePosition : durablePosition;
- }
-
- private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() {
- PositionImpl nonDurablePosition =
nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors();
- PositionImpl durablePosition =
activeCursors.getSlowestMarkDeletedPositionForActiveCursors();
- if (nonDurablePosition == null) {
- return durablePosition;
- }
- if (durablePosition == null) {
- return nonDurablePosition;
+ if (!activeCursors.isEmpty()) {
+ PositionImpl evictionPos =
activeCursors.getSlowestReaderPosition();
+ if (evictionPos != null) {
+ entryCache.invalidateEntries(evictionPos);
+ }
+ } else {
+ entryCache.clear();
}
- return durablePosition.compareTo(nonDurablePosition) > 0 ?
nonDurablePosition : durablePosition;
}
void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated(cursor,
newPosition);
+ if (config.isCacheEvictionByMarkDeletedPosition()) {
+ updateActiveCursor(cursor, newPosition);
+ }
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]