This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5f5b45e1735d81dc182c36e66d43165c0251572d Author: Matteo Merli <[email protected]> AuthorDate: Sun Feb 13 21:22:38 2022 -0800 Clean up individually deleted messages before the mark-delete position (#14261) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 17 ++++++++++- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 34 ++++++++++++++++++---- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 2a908974606..f4c450883a6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1517,7 +1517,9 @@ public class ManagedCursorImpl implements ManagedCursor { */ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) { if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) { - throw new IllegalArgumentException("Mark deleting an already mark-deleted position"); + throw new IllegalArgumentException( + "Mark deleting an already mark-deleted position. Current mark-delete: " + markDeletePosition + + " -- attempted mark delete: " + newMarkDeletePosition); } PositionImpl oldMarkDeletePosition = markDeletePosition; @@ -1914,6 +1916,19 @@ public class ManagedCursorImpl implements ManagedCursor { // mark-delete to the upper bound of the first range segment Range<PositionImpl> range = individualDeletedMessages.firstRange(); + // If the upper bound is before the mark-delete position, we need to move ahead as these + // individualDeletedMessages are now irrelevant + if (range.upperEndpoint().compareTo(markDeletePosition) <= 0) { + individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(), + markDeletePosition.getEntryId()); + range = individualDeletedMessages.firstRange(); + } + + if (range == null) { + // The set was completely cleaned up now + return; + } + // If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger .getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 5a56026f0ef..b6c40bcfc69 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -29,11 +29,11 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.google.common.base.Charsets; import com.google.common.collect.Lists; import com.google.common.collect.Range; import com.google.common.collect.Sets; +import io.netty.buffer.ByteBufAllocator; import java.lang.reflect.Field; import java.nio.charset.Charset; import java.time.Duration; @@ -45,7 +45,6 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -58,8 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; - -import io.netty.buffer.ByteBufAllocator; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -85,8 +82,9 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; -import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.common.api.proto.PulsarApi.IntRange; +import org.apache.pulsar.common.util.collections.LongPairRangeSet; +import org.apache.pulsar.metadata.api.Stat; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.MockZooKeeper; import org.awaitility.Awaitility; @@ -3417,7 +3415,6 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { ManagedCursor c2 = ledger2.openCursor("c"); assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); - } finally { factory2.shutdown(); } @@ -3427,6 +3424,31 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { dirtyFactory.shutdown(); } + @Test + public void testConsistencyOfIndividualMessages() throws Exception { + ManagedLedger ledger1 = factory.open("testConsistencyOfIndividualMessages"); + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger1.openCursor("c"); + + PositionImpl p1 = (PositionImpl) ledger1.addEntry(new byte[1024]); + c1.markDelete(p1); + + // Artificially add a position that is before the current mark-delete position + LongPairRangeSet<PositionImpl> idm = c1.getIndividuallyDeletedMessagesSet(); + idm.addOpenClosed(p1.getLedgerId() - 1, 0, p1.getLedgerId() - 1, 10); + + List<Position> positions = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + positions.add(ledger1.addEntry(new byte[1024])); + } + + for (int i = 0; i < 20; i++) { + c1.delete(positions.get(i)); + } + + assertEquals(c1.getTotalNonContiguousDeletedMessagesRange(), 0); + assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + } + @Test public void testCursorGetBacklog() throws Exception { ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
