This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5f5b45e1735d81dc182c36e66d43165c0251572d
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Feb 13 21:22:38 2022 -0800

    Clean up individually deleted messages before the mark-delete position 
(#14261)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 17 ++++++++++-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 34 ++++++++++++++++++----
 2 files changed, 44 insertions(+), 7 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 2a908974606..f4c450883a6 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1517,7 +1517,9 @@ public class ManagedCursorImpl implements ManagedCursor {
      */
     PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
         if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
-            throw new IllegalArgumentException("Mark deleting an already 
mark-deleted position");
+            throw new IllegalArgumentException(
+                    "Mark deleting an already mark-deleted position. Current 
mark-delete: " + markDeletePosition
+                            + " -- attempted mark delete: " + 
newMarkDeletePosition);
         }
 
         PositionImpl oldMarkDeletePosition = markDeletePosition;
@@ -1914,6 +1916,19 @@ public class ManagedCursorImpl implements ManagedCursor {
             // mark-delete to the upper bound of the first range segment
             Range<PositionImpl> range = individualDeletedMessages.firstRange();
 
+            // If the upper bound is before the mark-delete position, we need 
to move ahead as these
+            // individualDeletedMessages are now irrelevant
+            if (range.upperEndpoint().compareTo(markDeletePosition) <= 0) {
+                
individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(),
+                        markDeletePosition.getEntryId());
+                range = individualDeletedMessages.firstRange();
+            }
+
+            if (range == null) {
+                // The set was completely cleaned up now
+                return;
+            }
+
             // If the lowerBound is ahead of MarkDelete, verify if there are 
any entries in-between
             if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || 
ledger
                     .getNumberOfEntries(Range.openClosed(markDeletePosition, 
range.lowerEndpoint())) <= 0) {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 5a56026f0ef..b6c40bcfc69 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -29,11 +29,11 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBufAllocator;
 import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.time.Duration;
@@ -45,7 +45,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -58,8 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
-
-import io.netty.buffer.ByteBufAllocator;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -85,8 +82,9 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet;
+import org.apache.pulsar.metadata.api.Stat;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
 import org.awaitility.Awaitility;
@@ -3417,7 +3415,6 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
                         ManagedCursor c2 = ledger2.openCursor("c");
 
                         assertEquals(c2.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
-
                     } finally {
                         factory2.shutdown();
                     }
@@ -3427,6 +3424,31 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         dirtyFactory.shutdown();
     }
 
+    @Test
+    public void testConsistencyOfIndividualMessages() throws Exception {
+        ManagedLedger ledger1 = 
factory.open("testConsistencyOfIndividualMessages");
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger1.openCursor("c");
+
+        PositionImpl p1 = (PositionImpl) ledger1.addEntry(new byte[1024]);
+        c1.markDelete(p1);
+
+        // Artificially add a position that is before the current mark-delete 
position
+        LongPairRangeSet<PositionImpl> idm = 
c1.getIndividuallyDeletedMessagesSet();
+        idm.addOpenClosed(p1.getLedgerId() - 1, 0, p1.getLedgerId() - 1, 10);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            positions.add(ledger1.addEntry(new byte[1024]));
+        }
+
+        for (int i = 0; i < 20; i++) {
+            c1.delete(positions.get(i));
+        }
+
+        assertEquals(c1.getTotalNonContiguousDeletedMessagesRange(), 0);
+        assertEquals(c1.getMarkDeletedPosition(), 
positions.get(positions.size() - 1));
+    }
+
     @Test
     public void testCursorGetBacklog() throws Exception {
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();

Reply via email to