Jason918 commented on a change in pull request #14985:
URL: https://github.com/apache/pulsar/pull/14985#discussion_r841072182



##########
File path: 
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
##########
@@ -291,6 +291,189 @@ public void acknowledge1() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testCacheEvictionByMarkDeletedPosition() throws Throwable {
+        final CountDownLatch counter = new CountDownLatch(1);
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setCacheEvictionByMarkDeletedPosition(true);
+        factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+                .toNanos(30000));
+        factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
+            @Override
+            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+                ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() 
{
+                    @Override
+                    public void openCursorComplete(ManagedCursor cursor, 
Object ctx) {
+                        ManagedLedger ledger = (ManagedLedger) ctx;
+                        String message1 = "test";
+                        ledger.asyncAddEntry(message1.getBytes(Encoding), new 
AddEntryCallback() {
+                            @Override
+                            public void addComplete(Position position, ByteBuf 
entryData, Object ctx) {
+                                @SuppressWarnings("unchecked")
+                                Pair<ManagedLedger, ManagedCursor> pair = 
(Pair<ManagedLedger, ManagedCursor>) ctx;
+                                ManagedLedger ledger = pair.getLeft();
+                                ManagedCursor cursor = pair.getRight();
+                                assertEquals(((ManagedLedgerImpl) 
ledger).getCacheSize(), message1.getBytes(Encoding).length);
+
+                                cursor.asyncReadEntries(1, new 
ReadEntriesCallback() {
+                                    @Override
+                                    public void 
readEntriesComplete(List<Entry> entries, Object ctx) {
+                                        ManagedCursor cursor = (ManagedCursor) 
ctx;
+                                        assertEquals(entries.size(), 1);
+                                        Entry entry = entries.get(0);
+                                        final Position position = 
entry.getPosition();
+                                        assertEquals(new 
String(entry.getDataAndRelease(), Encoding), message1);
+                                        assertEquals(((ManagedLedgerImpl) 
ledger).getCacheSize(), message1.getBytes(Encoding).length);
+
+                                        log.debug("Mark-Deleting to position 
{}", position);
+                                        cursor.asyncMarkDelete(position, new 
MarkDeleteCallback() {
+                                            @Override
+                                            public void 
markDeleteComplete(Object ctx) {
+                                                log.debug("Mark delete 
complete");
+                                                ManagedCursor cursor = 
(ManagedCursor) ctx;
+                                                
assertFalse(cursor.hasMoreEntries());
+                                                // wait eviction  finish.
+                                                try {

Review comment:
       It's better to trigger `ManagedLedgerImpl#doCacheEviction` manually 
instead of using sleep to avoid flaky test.
   Or just use `Awaitility.await().untilAsserted`.

##########
File path: 
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
##########
@@ -291,6 +291,189 @@ public void acknowledge1() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testCacheEvictionByMarkDeletedPosition() throws Throwable {
+        final CountDownLatch counter = new CountDownLatch(1);
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setCacheEvictionByMarkDeletedPosition(true);
+        factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+                .toNanos(30000));
+        factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
+            @Override
+            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+                ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() 
{
+                    @Override
+                    public void openCursorComplete(ManagedCursor cursor, 
Object ctx) {
+                        ManagedLedger ledger = (ManagedLedger) ctx;
+                        String message1 = "test";
+                        ledger.asyncAddEntry(message1.getBytes(Encoding), new 
AddEntryCallback() {
+                            @Override
+                            public void addComplete(Position position, ByteBuf 
entryData, Object ctx) {
+                                @SuppressWarnings("unchecked")
+                                Pair<ManagedLedger, ManagedCursor> pair = 
(Pair<ManagedLedger, ManagedCursor>) ctx;
+                                ManagedLedger ledger = pair.getLeft();
+                                ManagedCursor cursor = pair.getRight();
+                                assertEquals(((ManagedLedgerImpl) 
ledger).getCacheSize(), message1.getBytes(Encoding).length);
+
+                                cursor.asyncReadEntries(1, new 
ReadEntriesCallback() {
+                                    @Override
+                                    public void 
readEntriesComplete(List<Entry> entries, Object ctx) {
+                                        ManagedCursor cursor = (ManagedCursor) 
ctx;
+                                        assertEquals(entries.size(), 1);
+                                        Entry entry = entries.get(0);
+                                        final Position position = 
entry.getPosition();
+                                        assertEquals(new 
String(entry.getDataAndRelease(), Encoding), message1);
+                                        assertEquals(((ManagedLedgerImpl) 
ledger).getCacheSize(), message1.getBytes(Encoding).length);

Review comment:
       Need a manual trigger of `ManagedLedgerImpl#doCacheEviction` before this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to