This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 887e0d1 Clean up individually deleted messages before the mark-delete
position (#14261)
887e0d1 is described below
commit 887e0d1d2b279d36832797e0ca54bc458c4f2bb0
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Feb 13 21:22:38 2022 -0800
Clean up individually deleted messages before the mark-delete position
(#14261)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 17 +++++++++++++-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 26 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 61a5aa3..b775978 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1579,7 +1579,9 @@ public class ManagedCursorImpl implements ManagedCursor {
*/
PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
- throw new IllegalArgumentException("Mark deleting an already
mark-deleted position");
+ throw new IllegalArgumentException(
+ "Mark deleting an already mark-deleted position. Current
mark-delete: " + markDeletePosition
+ + " -- attempted mark delete: " +
newMarkDeletePosition);
}
PositionImpl oldMarkDeletePosition = markDeletePosition;
@@ -1999,6 +2001,19 @@ public class ManagedCursorImpl implements ManagedCursor {
// mark-delete to the upper bound of the first range segment
Range<PositionImpl> range = individualDeletedMessages.firstRange();
+ // If the upper bound is before the mark-delete position, we need
to move ahead as these
+ // individualDeletedMessages are now irrelevant
+ if (range.upperEndpoint().compareTo(markDeletePosition) <= 0) {
+
individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(),
+ markDeletePosition.getEntryId());
+ range = individualDeletedMessages.firstRange();
+ }
+
+ if (range == null) {
+ // The set was completely cleaned up now
+ return;
+ }
+
// If the lowerBound is ahead of MarkDelete, verify if there are
any entries in-between
if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 ||
ledger
.getNumberOfEntries(Range.openClosed(markDeletePosition,
range.lowerEndpoint())) <= 0) {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 1c8960b..84a5d66 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -88,6 +88,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
@@ -3512,6 +3513,31 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
}
@Test
+ public void testConsistencyOfIndividualMessages() throws Exception {
+ ManagedLedger ledger1 =
factory.open("testConsistencyOfIndividualMessages");
+ ManagedCursorImpl c1 = (ManagedCursorImpl) ledger1.openCursor("c");
+
+ PositionImpl p1 = (PositionImpl) ledger1.addEntry(new byte[1024]);
+ c1.markDelete(p1);
+
+ // Artificially add a position that is before the current mark-delete
position
+ LongPairRangeSet<PositionImpl> idm =
c1.getIndividuallyDeletedMessagesSet();
+ idm.addOpenClosed(p1.getLedgerId() - 1, 0, p1.getLedgerId() - 1, 10);
+
+ List<Position> positions = new ArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ positions.add(ledger1.addEntry(new byte[1024]));
+ }
+
+ for (int i = 0; i < 20; i++) {
+ c1.delete(positions.get(i));
+ }
+
+ assertEquals(c1.getTotalNonContiguousDeletedMessagesRange(), 0);
+ assertEquals(c1.getMarkDeletedPosition(),
positions.get(positions.size() -1));
+ }
+
+ @Test
public void testCursorCheckReadPositionChanged() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new
ManagedLedgerConfig());
ManagedCursor c1 = ledger.openCursor("c1");