This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 900ba224c426cc83b61f6625a9e3e54f6ea2d7ba Author: lipenghui <[email protected]> AuthorDate: Sun Feb 7 11:18:28 2021 +0800 Fix the batch index ack persistent issue. (#9504) Fix the batch index ack persistent issue. Currently, the `batchDeletedIndexInfoBuilder` been reused for generating the batch index ack data, but the delete set does not been cleared before adding the delete set. Clear the delete set before add the new delete set. Test added. Without this fix, the test failed. (cherry picked from commit 7916666cd579438a5bc11e749e73d6e549d7e756) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 1 + .../bookkeeper/mledger/impl/ManagedCursorTest.java | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index e9b96be..861b5ef 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2471,6 +2471,7 @@ public class ManagedCursorImpl implements ManagedCursor { for (long l : array) { deleteSet.add(l); } + batchDeletedIndexInfoBuilder.clearDeleteSet(); batchDeletedIndexInfoBuilder.addAllDeleteSet(deleteSet); result.add(batchDeletedIndexInfoBuilder.build()); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 256457c..8bf0e6d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -3153,7 +3153,10 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { @Test public void testBatchIndexesDeletionPersistAndRecover() throws ManagedLedgerException, InterruptedException { - ManagedLedger ledger = factory.open("test_batch_indexes_deletion_persistent"); + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + // Make sure the cursor metadata updated by the cursor ledger ID. + managedLedgerConfig.setMaxUnackedRangesToPersistInZk(-1); + ManagedLedger ledger = factory.open("test_batch_indexes_deletion_persistent", managedLedgerConfig); ManagedCursor cursor = ledger.openCursor("c1"); final int totalEntries = 100; @@ -3163,6 +3166,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding)); } assertEquals(cursor.getNumberOfEntries(), totalEntries); + deleteBatchIndex(cursor, positions[6], 10, Lists.newArrayList(IntRange.newBuilder().setStart(1).setEnd(3).build())); deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(IntRange.newBuilder().setStart(3).setEnd(6).build())); deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build())); deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build())); @@ -3170,8 +3174,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build())); deleteBatchIndex(cursor, positions[4], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build())); - ledger = factory.open("test_batch_indexes_deletion_persistent"); + cursor.close(); + ledger.close(); + ledger = factory.open("test_batch_indexes_deletion_persistent", managedLedgerConfig); cursor = ledger.openCursor("c1"); + List<IntRange> deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10); Assert.assertEquals(deletedIndexes.size(), 1); Assert.assertEquals(deletedIndexes.get(0).getStart(), 3); @@ -3181,6 +3188,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10); Assert.assertNull(deletedIndexes); Assert.assertEquals(cursor.getMarkDeletedPosition(), positions[5]); + + deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[6]), 10); + Assert.assertEquals(deletedIndexes.size(), 1); + Assert.assertEquals(deletedIndexes.get(0).getStart(), 1); + Assert.assertEquals(deletedIndexes.get(0).getEnd(), 3); } private void deleteBatchIndex(ManagedCursor cursor, Position position, int batchSize,
