This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ddd77c985252a7f9ec87005f616a41a4ea1128f6 Author: Addison Higham <[email protected]> AuthorDate: Sun Feb 28 22:54:05 2021 -0700 Fix marking individual deletes as dirty (#9732) * Fix marking individual deletes as dirty When we mark cursors as dirty, we aren't marking when individual acks cause a dirty cursor. This results in cursors not being flushed and causing redelivery. This one line fix will ensure we mark the cursor as dirty in this situation as well * add a test * improve tests to not use sleep * make the polling rate be slower (cherry picked from commit 34ca8938ca13ea60b05d164c27d9755855caf87c) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 + .../bookkeeper/mledger/impl/ManagedCursorTest.java | 87 ++++++++++++++++++++-- 2 files changed, 83 insertions(+), 8 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 8e02b89..4ff912f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1942,6 +1942,7 @@ public class ManagedCursorImpl implements ManagedCursor { // Apply rate limiting to mark-delete operations if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) { + isDirty = true; PositionImpl finalNewMarkDeletePosition = newMarkDeletePosition; LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> new MarkDeleteEntry(finalNewMarkDeletePosition, last.properties, null, null)); @@ -2929,6 +2930,9 @@ public class ManagedCursorImpl implements ManagedCursor { asyncMarkDelete(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Flushed dirty mark-delete position", ledger.getName(), name); + } } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index bcdbeca..4f6e1a0 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -36,6 +36,7 @@ import com.google.common.collect.Range; import com.google.common.collect.Sets; import java.lang.reflect.Field; import java.nio.charset.Charset; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; @@ -3336,18 +3337,88 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1)); - // Give chance to the flush to be automatically triggered. - Thread.sleep(3000); + Awaitility.await() + // Give chance to the flush to be automatically triggered. + // NOTE: this can't be set too low, or it causes issues with ZK thread pool rejecting + .pollDelay(Duration.ofMillis(2000)) + .untilAsserted(() -> { + // Abruptly re-open the managed ledger without graceful close + ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + try { + ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config); + ManagedCursor c2 = ledger2.openCursor("c"); - // Abruptly re-open the managed ledger without graceful close - ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); - ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config); - ManagedCursor c2 = ledger2.openCursor("c"); + assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); - assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + } finally { + factory2.shutdown(); + } + }); factory1.shutdown(); - factory2.shutdown(); + } + + @Test + public void testFlushCursorAfterIndividualDeleteInactivity() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setThrottleMarkDelete(1.0); + + ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig(); + factoryConfig.setCursorPositionFlushSeconds(1); + ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle(), factoryConfig); + ManagedLedger ledger1 = factory1.open("testFlushCursorAfterIndDelInactivity", config); + ManagedCursor c1 = ledger1.openCursor("c"); + List<Position> positions = new ArrayList<Position>(); + + for (int i = 0; i < 20; i++) { + positions.add(ledger1.addEntry(new byte[1024])); + } + + CountDownLatch latch = new CountDownLatch(positions.size()); + + positions.forEach(p -> c1.asyncDelete(p, new DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + throw new RuntimeException(exception); + } + }, null)); + + latch.await(); + + assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + + // reopen the cursor and we should see entries not be flushed + ManagedLedgerFactory dirtyFactory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + ManagedLedger ledgerDirty = dirtyFactory.open("testFlushCursorAfterIndDelInactivity", config); + ManagedCursor dirtyCursor = ledgerDirty.openCursor("c"); + + assertNotEquals(dirtyCursor.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + + Awaitility.await() + // Give chance to the flush to be automatically triggered. + // NOTE: this can't be set too low, or it causes issues with ZK thread pool rejecting + .pollDelay(Duration.ofMillis(2000)) + .untilAsserted(() -> { + // Abruptly re-open the managed ledger without graceful close + ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + try { + ManagedLedger ledger2 = factory2.open("testFlushCursorAfterIndDelInactivity", config); + ManagedCursor c2 = ledger2.openCursor("c"); + + assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + + } finally { + factory2.shutdown(); + } + }); + + factory1.shutdown(); + dirtyFactory.shutdown(); } private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
