This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c5ba529  [pulsar-broker] Fix expiry monitor to continue on 
non-recoverable error (#4818)
c5ba529 is described below

commit c5ba52983fee994de61984aae7d1757e9b738caf
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Wed Aug 14 07:04:20 2019 -0700

    [pulsar-broker] Fix expiry monitor to continue on non-recoverable error 
(#4818)
    
    ### Motivation
    
    In #1046, we have added a flag (`autoSkipNonRecoverableData`) and mechanism 
to recover cursor if ledger data is deleted. However, expiery-monitor doesn't 
use that flag and it gets stuck when it finds non-recoverable ml-error while 
cleaning up expired message.
    
    ### Modification
    Expiry-monitor can skip non-recoverable managed-ledger exception (eg: 
data/ledger doesn't exist anymore) when `autoSkipNonRecoverableData` flag is 
enabled.
---
 .../apache/bookkeeper/mledger/AsyncCallbacks.java  |  3 +-
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  7 ++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 15 +++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  4 +
 .../bookkeeper/mledger/impl/OpFindNewest.java      |  5 +-
 .../mledger/impl/ManagedCursorContainerTest.java   |  7 ++
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  4 +-
 .../persistent/PersistentMessageExpiryMonitor.java | 14 +++-
 .../persistent/PersistentMessageFinder.java        |  7 +-
 .../service/persistent/PersistentSubscription.java |  3 +-
 .../service/PersistentMessageFinderTest.java       | 89 +++++++++++++++++++---
 11 files changed, 136 insertions(+), 22 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index e861fad..8a21385 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger;
 
 import com.google.common.annotations.Beta;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * Definition of all the callbacks used for the ManagedLedger asynchronous API.
@@ -116,7 +117,7 @@ public interface AsyncCallbacks {
     interface FindEntryCallback {
         void findEntryComplete(Position position, Object ctx);
 
-        void findEntryFailed(ManagedLedgerException exception, Object ctx);
+        void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx);
     }
 
     interface ResetCursorCallback {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index fc3f6a4..03a68ba 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -587,4 +587,11 @@ public interface ManagedCursor {
      */
     void setThrottleMarkDelete(double throttleMarkDelete);
 
+    /**
+     * Get {@link ManagedLedger} attached with cursor
+     * 
+     * @return ManagedLedger
+     */
+    ManagedLedger getManagedLedger();
+
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index d185274..5292688 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -45,6 +45,7 @@ import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -71,6 +72,7 @@ import 
org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
@@ -766,7 +768,8 @@ public class ManagedCursorImpl implements ManagedCursor {
             }
 
             @Override
-            public void findEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition,
+                    Object ctx) {
                 result.exception = exception;
                 counter.countDown();
             }
@@ -796,11 +799,12 @@ public class ManagedCursorImpl implements ManagedCursor {
             max = getNumberOfEntriesInStorage();
             break;
         default:
-            callback.findEntryFailed(new ManagedLedgerException("Unknown 
position constraint"), ctx);
+            callback.findEntryFailed(new ManagedLedgerException("Unknown 
position constraint"), Optional.empty(), ctx);
             return;
         }
         if (startPosition == null) {
-            callback.findEntryFailed(new ManagedLedgerException("Couldn't find 
start position"), ctx);
+            callback.findEntryFailed(new ManagedLedgerException("Couldn't find 
start position"),
+                    Optional.empty(), ctx);
             return;
         }
         op = new OpFindNewest(this, startPosition, condition, max, callback, 
ctx);
@@ -2581,5 +2585,10 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
     }
 
+    @Override
+    public ManagedLedger getManagedLedger() {
+        return this.ledger;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorImpl.class);
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 749e560..a1a5a15 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -49,6 +49,7 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -3023,6 +3024,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     public static ManagedLedgerException 
createManagedLedgerException(Throwable t) {
         if (t instanceof org.apache.bookkeeper.client.api.BKException) {
             return 
createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException) 
t).getCode());
+        } else if (t instanceof CompletionException
+                && !(t.getCause() instanceof CompletionException) /* check to 
avoid stackoverlflow */) {
+            return createManagedLedgerException(t.getCause());
         } else {
             return new ManagedLedgerException("Unknown exception");
         }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index 57e8044..4bce569 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -21,6 +21,9 @@ package org.apache.bookkeeper.mledger.impl;
 import com.google.common.base.Predicate;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+
+import java.util.Optional;
+
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
@@ -107,7 +110,7 @@ class OpFindNewest implements ReadEntryCallback {
 
     @Override
     public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
-        callback.findEntryFailed(exception, OpFindNewest.this.ctx);
+        callback.findEntryFailed(exception, 
Optional.ofNullable(searchPosition), OpFindNewest.this.ctx);
     }
 
     public void find() {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 60d3dc9..c415320 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -40,6 +40,7 @@ import 
org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.testng.annotations.Test;
@@ -308,6 +309,12 @@ public class ManagedCursorContainerTest {
         public double getThrottleMarkDelete() {
             return -1;
         }
+
+        @Override
+        public ManagedLedger getManagedLedger() {
+            return null;
+        }
+
     }
 
     @Test
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 4f2cce5..70db43c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -2077,7 +2078,8 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
             }
 
             @Override
-            public void findEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition,
+                    Object ctx) {
                 result.exception = exception;
                 counter.countDown();
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 85d3785..deb2744 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
+import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
@@ -37,6 +39,7 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
     private final String subName;
     private final String topicName;
     private final Rate msgExpired;
+    private final boolean autoSkipNonRecoverableData;
 
     private static final int FALSE = 0;
     private static final int TRUE = 1;
@@ -50,6 +53,9 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
         this.cursor = cursor;
         this.subName = subscriptionName;
         this.msgExpired = new Rate();
+        this.autoSkipNonRecoverableData = cursor.getManagedLedger() != null  
// check to avoid test failures
+                ? 
cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData()
+                : false;
     }
 
     public void expireMessages(int messageTTLInSeconds) {
@@ -124,10 +130,16 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
     }
 
     @Override
-    public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
+    public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Finding expired entry operation failed", 
topicName, subName, exception);
         }
+        if (autoSkipNonRecoverableData && failedReadPosition.isPresent()
+                && (exception instanceof NonRecoverableLedgerException)) {
+            log.warn("[{}][{}] read failed from ledger at position:{} : {}", 
topicName, subName, failedReadPosition,
+                    exception.getMessage());
+            findEntryComplete(failedReadPosition.get(), ctx);
+        }
         expirationCheckInProgress = FALSE;
         updateRates();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index a088200..9e75149 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -83,7 +84,7 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
             }
             callback.findEntryFailed(
                     new 
ManagedLedgerException.ConcurrentFindCursorPositionException("last find is 
still running"),
-                    null);
+                    Optional.empty(), null);
         }
     }
 
@@ -106,7 +107,7 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
     }
 
     @Override
-    public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
+    public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
         checkArgument(ctx instanceof AsyncCallbacks.FindEntryCallback);
         AsyncCallbacks.FindEntryCallback callback = 
(AsyncCallbacks.FindEntryCallback) ctx;
         if (log.isDebugEnabled()) {
@@ -114,6 +115,6 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
                     timestamp, exception);
         }
         messageFindInProgress = FALSE;
-        callback.findEntryFailed(exception, null);
+        callback.findEntryFailed(exception, failedReadPosition, null);
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index d4adf45..34864a8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -593,7 +594,7 @@ public class PersistentSubscription implements Subscription 
{
             }
 
             @Override
-            public void findEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
                 // todo - what can go wrong here that needs to be retried?
                 if (exception instanceof 
ConcurrentFindCursorPositionException) {
                     future.completeExceptionally(new 
SubscriptionBusyException(exception.getMessage()));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index ae30b29..086c087 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -29,6 +29,7 @@ import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.lang.reflect.Field;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,7 +42,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -98,7 +103,8 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
             }
 
             @Override
-            public void findEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition,
+                    Object ctx) {
                 result.exception = exception;
                 future.completeExceptionally(exception);
             }
@@ -167,20 +173,23 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 
         PersistentMessageFinder messageFinder = new 
PersistentMessageFinder("topicname", c1);
         final AtomicBoolean ex = new AtomicBoolean(false);
-        messageFinder.findEntryFailed(new ManagedLedgerException("failed"), 
new AsyncCallbacks.FindEntryCallback() {
-            @Override
-            public void findEntryComplete(Position position, Object ctx) {
-            }
+        messageFinder.findEntryFailed(new ManagedLedgerException("failed"), 
Optional.empty(),
+                new AsyncCallbacks.FindEntryCallback() {
+                    @Override
+                    public void findEntryComplete(Position position, Object 
ctx) {
+                    }
 
-            @Override
-            public void findEntryFailed(ManagedLedgerException exception, 
Object ctx) {
-                ex.set(true);
-            }
-        });
+                    @Override
+                    public void findEntryFailed(ManagedLedgerException 
exception, Optional<Position> failedReadPosition,
+                            Object ctx) {
+                        ex.set(true);
+                    }
+                });
         assertTrue(ex.get());
 
         PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor("topicname", c1.getName(), c1);
-        monitor.findEntryFailed(new 
ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), null);
+        monitor.findEntryFailed(new 
ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
+                Optional.empty(), null);
         Field field = 
monitor.getClass().getDeclaredField("expirationCheckInProgress");
         field.setAccessible(true);
         assertEquals(0, field.get(monitor));
@@ -190,4 +199,62 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         ledger.close();
         factory.shutdown();
     }
+    
+    /**
+     * It tests that message expiry doesn't get stuck if it can't read deleted 
ledger's entry.
+     * 
+     * @throws Exception
+     */
+    @Test
+    void testMessageExpiryWithNonRecoverableException() throws Exception {
+
+        final String ledgerAndCursorName = 
"testPersistentMessageExpiryWithNonRecoverableLedgers";
+        final int entriesPerLedger = 2;
+        final int totalEntries = 10;
+        final int ttlSeconds = 1;
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(10);
+        config.setMaxEntriesPerLedger(entriesPerLedger);
+        config.setRetentionTime(1, TimeUnit.HOURS);
+        config.setAutoSkipNonRecoverableData(true);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
+
+        for (int i = 0; i < totalEntries; i++) {
+            ledger.addEntry(createMessageWrittenToLedger("msg" + i));
+        }
+
+        List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
+        LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);
+
+        assertEquals(ledgers.size(), totalEntries / entriesPerLedger);
+
+        // this will make sure that all entries should be deleted
+        Thread.sleep(ttlSeconds);
+
+        bkc.deleteLedger(ledgers.get(0).getLedgerId());
+        bkc.deleteLedger(ledgers.get(1).getLedgerId());
+        bkc.deleteLedger(ledgers.get(2).getLedgerId());
+
+        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor("topicname", c1.getName(), c1);
+        Position previousMarkDelete = null;
+        for (int i = 0; i < totalEntries; i++) {
+            monitor.expireMessages(1);
+            Position previousPos = previousMarkDelete;
+            retryStrategically(
+                    (test) -> c1.getMarkDeletedPosition() != null && 
!c1.getMarkDeletedPosition().equals(previousPos),
+                    5, 100);
+            previousMarkDelete = c1.getMarkDeletedPosition();
+        }
+
+        PositionImpl markDeletePosition = (PositionImpl) 
c1.getMarkDeletedPosition();
+        assertEquals(lastLedgerInfo.getLedgerId(), 
markDeletePosition.getLedgerId());
+        assertEquals(lastLedgerInfo.getEntries() - 1, 
markDeletePosition.getEntryId());
+
+        c1.close();
+        ledger.close();
+        factory.shutdown();
+
+    }
 }

Reply via email to