This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1fbc7ed9ab0bbe1c75d6576bc37933afd4bbc42f Author: lipenghui <[email protected]> AuthorDate: Thu Oct 21 09:52:47 2021 +0800 Fix compactor skips data from last compacted Ledger (#12429) ## Motivation The PR is fixing the compacted data lost during the data compaction. We see a few events deletion but the compacted events obviously dropped a lot.  After investigating more details about the issue, only the first read operation reads the data from the compacted ledger, since the second read operation, the broker start read data from the original topic. ``` 2021-10-19T23:09:30,021+0800 [broker-topic-workers-OrderedScheduler-7-0] INFO org.apache.pulsar.compaction.CompactedTopicImpl - =====[public/default/persistent/c499d42c-75d7-48d1-9225-2e724c0e1d83] Read from compacted Ledger = cursor position: -1:-1, Horizon: 16:-1, isFirstRead: true 2021-10-19T23:09:30,049+0800 [broker-topic-workers-OrderedScheduler-7-0] INFO org.apache.pulsar.compaction.CompactedTopicImpl - =====[public/default/persistent/c499d42c-75d7-48d1-9225-2e724c0e1d83] Read from original Ledger = cursor position: 16:0, Horizon: 16:-1, isFirstRead: false ``` ## Modifications The compaction task depends on the last snapshot and the incremental entries to build the new snapshot. So for the compaction cursor, we need to force seek the read position to ensure the compactor can read the complete last snapshot because the compactor will read the data before the compaction cursor mark delete position. ## Verifying this change New test added for checking the compacted data will not lost. (cherry picked from commit 1830f90d08acc079c6ee5a5ec05751ab4cbee490) --- .../apache/bookkeeper/mledger/ManagedCursor.java | 6 ++- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 +-- .../mledger/impl/ManagedCursorContainerTest.java | 2 +- .../pulsar/compaction/CompactedTopicImpl.java | 9 ++++- .../pulsar/compaction/CompactedTopicTest.java | 45 ++++++++++++++++++++++ 5 files changed, 61 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 4af6455..72ee1a1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -425,7 +425,11 @@ public interface ManagedCursor { * @param newReadPosition * the position where to move the cursor */ - void seek(Position newReadPosition); + default void seek(Position newReadPosition) { + seek(newReadPosition, false); + } + + void seek(Position newReadPosition, boolean force); /** * Clear the cursor backlog. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 82b4cfc..8e794df 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2163,18 +2163,16 @@ public class ManagedCursorImpl implements ManagedCursor { } @Override - public void seek(Position newReadPositionInt) { + public void seek(Position newReadPositionInt, boolean force) { checkArgument(newReadPositionInt instanceof PositionImpl); PositionImpl newReadPosition = (PositionImpl) newReadPositionInt; lock.writeLock().lock(); try { - if (newReadPosition.compareTo(markDeletePosition) <= 0) { + if (!force && newReadPosition.compareTo(markDeletePosition) <= 0) { // Make sure the newReadPosition comes after the mark delete position newReadPosition = ledger.getNextValidPosition(markDeletePosition); } - - PositionImpl oldReadPosition = readPosition; readPosition = newReadPosition; } finally { lock.writeLock().unlock(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 6b9c009..57e1964 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -175,7 +175,7 @@ public class ManagedCursorContainerTest { } @Override - public void seek(Position newReadPosition) { + public void seek(Position newReadPosition, boolean force) { } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index 1313413..4375188 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.compaction; +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.ComparisonChain; @@ -122,7 +123,13 @@ public class CompactedTopicImpl implements CompactedTopic { return readEntries(context.ledger, startPoint, endPoint) .thenAccept((entries) -> { Entry lastEntry = entries.get(entries.size() - 1); - cursor.seek(lastEntry.getPosition().getNext()); + // The compaction task depends on the last snapshot and the incremental + // entries to build the new snapshot. So for the compaction cursor, we + // need to force seek the read position to ensure the compactor can read + // the complete last snapshot because of the compactor will read the data + // before the compaction cursor mark delete position + cursor.seek(lastEntry.getPosition().getNext(), + cursor.getName().equals(COMPACTION_SUBSCRIPTION)); callback.readEntriesComplete(entries, consumer); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java index 3d41088..adb6f46 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.compaction; +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.google.common.collect.Sets; @@ -437,4 +438,48 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { reader.readNext(); Assert.assertFalse(reader.hasMessageAvailable()); } + + @Test + public void testDoNotLossTheLastCompactedLedgerData() throws Exception { + String topic = "persistent://my-property/use/my-ns/testDoNotLossTheLastCompactedLedgerData-" + + UUID.randomUUID(); + final int numMessages = 2000; + final int keys = 200; + final String msg = "Test"; + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .blockIfQueueFull(true) + .maxPendingMessages(numMessages) + .enableBatching(false) + .create(); + CompletableFuture<MessageId> lastMessage = null; + for (int i = 0; i < numMessages; ++i) { + lastMessage = producer.newMessage().key(i % keys + "").value(msg).sendAsync(); + } + producer.flush(); + lastMessage.join(); + admin.topics().triggerCompaction(topic); + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); + Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); + Assert.assertEquals(stats.compactedLedger.entries, keys); + Assert.assertEquals(admin.topics().getStats(topic) + .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); + }); + admin.topics().unload(topic); + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); + Assert.assertEquals(stats.ledgers.size(), 1); + Assert.assertEquals(admin.topics().getStats(topic) + .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); + }); + admin.topics().unload(topic); + // Send one more key to and then to trigger the compaction + producer.newMessage().key(keys + "").value(msg).send(); + admin.topics().triggerCompaction(topic); + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); + Assert.assertEquals(stats.compactedLedger.entries, keys + 1); + }); + } }
