This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 7f12634fd7b Add a cache eviction policy:Evicting cache data by the 
slowest markDeletedPosition (#14985)
7f12634fd7b is described below

commit 7f12634fd7b1bf6e4bd9876f8c51490a6780ac36
Author: LinChen <[email protected]>
AuthorDate: Wed Apr 6 15:53:58 2022 +0800

    Add a cache eviction policy:Evicting cache data by the slowest 
markDeletedPosition (#14985)
    
    (cherry picked from commit 9b36dcd777b7e3b159b57cd409784a3622483132)
---
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |   5 +
 .../mledger/impl/ManagedCursorContainer.java       |   4 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  25 ++-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 198 +++++++++++++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |   9 +-
 .../pulsar/broker/service/BrokerService.java       |   2 +
 6 files changed, 238 insertions(+), 5 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 2f4e098d677..d7d3b1c02cd 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -26,6 +26,8 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.api.DigestType;
 
@@ -77,6 +79,9 @@ public class ManagedLedgerConfig {
     private int newEntriesCheckDelayInMillis = 10;
     private Clock clock = Clock.systemUTC();
     private ManagedLedgerInterceptor managedLedgerInterceptor;
+    @Getter
+    @Setter
+    private boolean cacheEvictionByMarkDeletedPosition = false;
 
     public boolean isCreateIfMissing() {
         return createIfMissing;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index c631cdcf96d..4c3d4134379 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -108,6 +108,10 @@ public class ManagedCursorContainer implements 
Iterable<ManagedCursor> {
         return heap.isEmpty() ? null : (PositionImpl) 
heap.get(0).cursor.getReadPosition();
     }
 
+    public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() {
+        return heap.isEmpty() ? null : (PositionImpl) 
heap.get(0).cursor.getMarkDeletedPosition();
+    }
+
     public ManagedCursor get(String name) {
         long stamp = rwLock.readLock();
         try {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 89e94934ded..cd47cf31ba3 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2127,10 +2127,15 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         if (entryCache.getSize() <= 0) {
             return;
         }
-        // Always remove all entries already read by active cursors
-        PositionImpl slowestReaderPos = 
getEarlierReadPositionForActiveCursors();
-        if (slowestReaderPos != null) {
-            entryCache.invalidateEntries(slowestReaderPos);
+        PositionImpl evictionPos;
+        if (config.isCacheEvictionByMarkDeletedPosition()) {
+            evictionPos = 
getEarlierMarkDeletedPositionForActiveCursors().getNext();
+        } else {
+            // Always remove all entries already read by active cursors
+            evictionPos = getEarlierReadPositionForActiveCursors();
+        }
+        if (evictionPos != null) {
+            entryCache.invalidateEntries(evictionPos);
         }
 
         // Remove entries older than the cutoff threshold
@@ -2149,6 +2154,18 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         return durablePosition.compareTo(nonDurablePosition) > 0 ? 
nonDurablePosition : durablePosition;
     }
 
+    private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() {
+        PositionImpl nonDurablePosition = 
nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors();
+        PositionImpl durablePosition = 
activeCursors.getSlowestMarkDeletedPositionForActiveCursors();
+        if (nonDurablePosition == null) {
+            return durablePosition;
+        }
+        if (durablePosition == null) {
+            return nonDurablePosition;
+        }
+        return durablePosition.compareTo(nonDurablePosition) > 0 ? 
nonDurablePosition : durablePosition;
+    }
+
     void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
         Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated(cursor, 
newPosition);
         if (pair == null) {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 1e499aeb31b..348ee81d4e2 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -293,6 +293,204 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testCacheEvictionByMarkDeletedPosition() throws Throwable {
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setCacheEvictionByMarkDeletedPosition(true);
+        factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+                .toNanos(30000));
+        factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
+            @Override
+            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+                ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() 
{
+                    @Override
+                    public void openCursorComplete(ManagedCursor cursor, 
Object ctx) {
+                        ManagedLedger ledger = (ManagedLedger) ctx;
+                        String message1 = "test";
+                        ledger.asyncAddEntry(message1.getBytes(Encoding), new 
AddEntryCallback() {
+                            @Override
+                            public void addComplete(Position position, ByteBuf 
entryData, Object ctx) {
+                                @SuppressWarnings("unchecked")
+                                Pair<ManagedLedger, ManagedCursor> pair = 
(Pair<ManagedLedger, ManagedCursor>) ctx;
+                                ManagedLedger ledger = pair.getLeft();
+                                ManagedCursor cursor = pair.getRight();
+                                if (((ManagedLedgerImpl) 
ledger).getCacheSize() != message1.getBytes(Encoding).length) {
+                                    result.complete(false);
+                                    return;
+                                }
+                                cursor.asyncReadEntries(1, new 
ReadEntriesCallback() {
+                                    @Override
+                                    public void 
readEntriesComplete(List<Entry> entries, Object ctx) {
+                                        ManagedCursor cursor = (ManagedCursor) 
ctx;
+                                        assertEquals(entries.size(), 1);
+                                        Entry entry = entries.get(0);
+                                        final Position position = 
entry.getPosition();
+                                        if (!message1.equals(new 
String(entry.getDataAndRelease(), Encoding))) {
+                                            result.complete(false);
+                                            return;
+                                        }
+                                        ((ManagedLedgerImpl) 
ledger).doCacheEviction(
+                                                System.nanoTime() - 
TimeUnit.MILLISECONDS.toNanos(30000));
+                                        if (((ManagedLedgerImpl) 
ledger).getCacheSize() != message1.getBytes(Encoding).length) {
+                                            result.complete(false);
+                                            return;
+                                        }
+
+                                        log.debug("Mark-Deleting to position 
{}", position);
+                                        cursor.asyncMarkDelete(position, new 
MarkDeleteCallback() {
+                                            @Override
+                                            public void 
markDeleteComplete(Object ctx) {
+                                                log.debug("Mark delete 
complete");
+                                                ManagedCursor cursor = 
(ManagedCursor) ctx;
+                                                if (cursor.hasMoreEntries()) {
+                                                    result.complete(false);
+                                                    return;
+                                                }
+                                                ((ManagedLedgerImpl) 
ledger).doCacheEviction(
+                                                        System.nanoTime() - 
TimeUnit.MILLISECONDS.toNanos(30000));
+                                                
result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0);
+                                            }
+
+                                            @Override
+                                            public void 
markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+                                                
result.completeExceptionally(exception);
+                                            }
+
+                                        }, cursor);
+                                    }
+
+                                    @Override
+                                    public void 
readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+                                        
result.completeExceptionally(exception);
+                                    }
+                                }, cursor, PositionImpl.latest);
+                            }
+
+                            @Override
+                            public void addFailed(ManagedLedgerException 
exception, Object ctx) {
+                                result.completeExceptionally(exception);
+                            }
+                        }, Pair.of(ledger, cursor));
+                    }
+
+                    @Override
+                    public void openCursorFailed(ManagedLedgerException 
exception, Object ctx) {
+                        result.completeExceptionally(exception);
+                    }
+
+                }, ledger);
+            }
+
+            @Override
+            public void openLedgerFailed(ManagedLedgerException exception, 
Object ctx) {
+                result.completeExceptionally(exception);
+            }
+        }, null, null);
+
+        assertTrue(result.get());
+
+        log.info("Test completed");
+    }
+
+    @Test
+    public void testCacheEvictionByReadPosition() throws Throwable {
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+                .toNanos(30000));
+        factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
+            @Override
+            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+                ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() 
{
+                    @Override
+                    public void openCursorComplete(ManagedCursor cursor, 
Object ctx) {
+                        ManagedLedger ledger = (ManagedLedger) ctx;
+                        String message1 = "test";
+                        ledger.asyncAddEntry(message1.getBytes(Encoding), new 
AddEntryCallback() {
+                            @Override
+                            public void addComplete(Position position, ByteBuf 
entryData, Object ctx) {
+                                @SuppressWarnings("unchecked")
+                                Pair<ManagedLedger, ManagedCursor> pair = 
(Pair<ManagedLedger, ManagedCursor>) ctx;
+                                ManagedLedger ledger = pair.getLeft();
+                                ManagedCursor cursor = pair.getRight();
+                                if (((ManagedLedgerImpl) 
ledger).getCacheSize() != message1.getBytes(Encoding).length) {
+                                    result.complete(false);
+                                    return;
+                                }
+
+                                cursor.asyncReadEntries(1, new 
ReadEntriesCallback() {
+                                    @Override
+                                    public void 
readEntriesComplete(List<Entry> entries, Object ctx) {
+                                        ManagedCursor cursor = (ManagedCursor) 
ctx;
+                                        assertEquals(entries.size(), 1);
+                                        Entry entry = entries.get(0);
+                                        final Position position = 
entry.getPosition();
+                                        if (!message1.equals(new 
String(entry.getDataAndRelease(), Encoding))) {
+                                            result.complete(false);
+                                            return;
+                                        }
+                                        ((ManagedLedgerImpl) 
ledger).doCacheEviction(
+                                                System.nanoTime() - 
TimeUnit.MILLISECONDS.toNanos(30000));
+                                        if (((ManagedLedgerImpl) 
ledger).getCacheSize() != 0) {
+                                            result.complete(false);
+                                            return;
+                                        }
+
+                                        log.debug("Mark-Deleting to position 
{}", position);
+                                        cursor.asyncMarkDelete(position, new 
MarkDeleteCallback() {
+                                            @Override
+                                            public void 
markDeleteComplete(Object ctx) {
+                                                log.debug("Mark delete 
complete");
+                                                ManagedCursor cursor = 
(ManagedCursor) ctx;
+                                                if (cursor.hasMoreEntries()) {
+                                                    result.complete(false);
+                                                    return;
+                                                }
+                                                
result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0);
+                                            }
+
+                                            @Override
+                                            public void 
markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+                                                
result.completeExceptionally(exception);
+                                            }
+
+                                        }, cursor);
+                                    }
+
+                                    @Override
+                                    public void 
readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+                                        
result.completeExceptionally(exception);
+                                    }
+                                }, cursor, PositionImpl.latest);
+                            }
+
+                            @Override
+                            public void addFailed(ManagedLedgerException 
exception, Object ctx) {
+                                result.completeExceptionally(exception);
+                            }
+                        }, Pair.of(ledger, cursor));
+                    }
+
+                    @Override
+                    public void openCursorFailed(ManagedLedgerException 
exception, Object ctx) {
+                        result.completeExceptionally(exception);
+                    }
+
+                }, ledger);
+            }
+
+            @Override
+            public void openLedgerFailed(ManagedLedgerException exception, 
Object ctx) {
+                result.completeExceptionally(exception);
+            }
+        }, null, null);
+
+        assertTrue(result.get());
+
+        log.info("Test completed");
+    }
+
     @Test(timeOut = 20000)
     public void asyncAPI() throws Throwable {
         final CountDownLatch counter = new CountDownLatch(1);
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ee7ed9e5d7c..65dad25cf1a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2234,7 +2234,14 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int managedLedgerOffloadPrefetchRounds = 1;
 
-    /**** --- Transaction config variables --- ****/
+    @FieldContext(
+            category = CATEGORY_STORAGE_ML,
+            doc = "Evicting cache data by the slowest markDeletedPosition or 
readPosition. "
+                    + "The default is to evict through readPosition."
+    )
+    private boolean cacheEvictionByMarkDeletedPosition = false;
+
+    /**** --- Transaction config variables. --- ****/
     @FieldContext(
             category = CATEGORY_TRANSACTION,
             doc = "Enable transaction coordinator in broker"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 39eeed7ae02..b251b0d843b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1530,6 +1530,8 @@ public class BrokerService implements Closeable {
                     
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
                     
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
                     
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+                    managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
+                            
serviceConfig.isCacheEvictionByMarkDeletedPosition());
 
                     OffloadPoliciesImpl nsLevelOffloadPolicies =
                             (OffloadPoliciesImpl) policies.map(p -> 
p.offload_policies).orElse(null);

Reply via email to