This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 23700dbe7eaf871a2160a47546dc3b03f51f3e1e Author: lipenghui <[email protected]> AuthorDate: Fri Jan 7 13:44:02 2022 +0800 Fix reader skipped remaining compacted data during the topic unloading. (#13629) ### Motivation To fix the reader skipping remaining compacted data while the topic has been unloaded. #11287 fixed the data skipped issue while the reader first time to read the messages with the earliest position. But if the reader has consumed some messages from the compacted ledger but not all, the start position will not be `earliest`, the broker will rewind the cursor for the reader to the next valid position of the original topic. So the remaining messages in the compacted ledger will be skipped. Here are the logs from the broker: ``` 10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO org.apache.pulsar.broker.service.BrokerService - Created topic persistent://xxx/product-full-prod/5126 - dedup is disabled 10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://xxx/product-full-prod/5126][xxx] Creating non-durable subscription at msg id 181759:14:-1:-1 10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl - [xxx/product-full-prod/persistent/5126] Created non-durable cursor read-position=221199:0 mark-delete-position=181759:13 10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx/product-full-prod/persistent/5126] Opened new cursor: NonDurableCursorImpl{ledger=xxx/product-full-prod/persistent/5126, ackPos=181759:13, readPos=221199:0} 10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [xxx/product-full-prod/persistent/5126-xxx] Rewind from 221199:0 to 221199:0 ``` There some many compacted messages after `181759:13`, but the broker will not dispatch them to the reader. The issue also can be reproduced by the unit test that was added in this PR. ### Modification If the cursor with `readCompacted = true`, just rewind to the next message of the mark delete position, so that the reader can continue to read the data from the compacted ledger. ### Verification A new test added for testing the reader can get all the compacted messages and non-compacted messages from the topic during the topic unloading. (cherry picked from commit 07f131fec7db36d6f424ce419d26888592b04207) --- .../apache/bookkeeper/mledger/ManagedLedger.java | 3 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 +-- .../mledger/impl/NonDurableCursorImpl.java | 23 ++++++--- .../AbstractDispatcherSingleActiveConsumer.java | 4 -- .../broker/service/persistent/PersistentTopic.java | 8 ++-- .../pulsar/compaction/CompactedTopicTest.java | 55 ++++++++++++++++++++++ .../offload/jcloud/impl/MockManagedLedger.java | 4 +- 7 files changed, 84 insertions(+), 20 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 74c67e9..0200e25 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -259,7 +259,8 @@ public interface ManagedLedger { */ ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException; ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException; - ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, InitialPosition initialPosition) throws ManagedLedgerException; + ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, InitialPosition initialPosition, + boolean isReadCompacted) throws ManagedLedgerException; /** * Delete a ManagedCursor asynchronously. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index cec106c7..457600c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1030,11 +1030,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException { - return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest); + return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest, false); } @Override - public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName, InitialPosition initialPosition) + public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName, InitialPosition initialPosition, + boolean isReadCompacted) throws ManagedLedgerException { Objects.requireNonNull(cursorName, "cursor name can't be null"); checkManagedLedgerIsOpen(); @@ -1049,7 +1050,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, config, this, cursorName, - (PositionImpl) startCursorPosition, initialPosition); + (PositionImpl) startCursorPosition, initialPosition, isReadCompacted); cursor.setActive(); log.info("[{}] Opened new cursor: {}", name, cursor); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 0f7ffe41..1d545bd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -33,11 +33,12 @@ import org.slf4j.LoggerFactory; public class NonDurableCursorImpl extends ManagedCursorImpl { - private volatile boolean readCompacted; + private final boolean readCompacted; NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName, - PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition) { + PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition, boolean isReadCompacted) { super(bookkeeper, config, ledger, cursorName); + this.readCompacted = isReadCompacted; // Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to // store the entryId, it's not able to pass a Long.max() as entryId. In this case there's no point to require @@ -67,7 +68,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { private void recoverCursor(PositionImpl mdPosition) { Pair<PositionImpl, Long> lastEntryAndCounter = ledger.getLastPositionAndCounter(); - this.readPosition = ledger.getNextValidPosition(mdPosition); + this.readPosition = isReadCompacted() ? mdPosition.getNext() : ledger.getNextValidPosition(mdPosition); markDeletePosition = mdPosition; // Initialize the counter such that the difference between the messages written on the ML and the @@ -118,15 +119,23 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { callback.deleteCursorComplete(ctx); } - public void setReadCompacted(boolean readCompacted) { - this.readCompacted = readCompacted; - } - public boolean isReadCompacted() { return readCompacted; } @Override + public void rewind() { + // For reading the compacted data, + // we couldn't reset the read position to the next valid position of the original topic. + // Otherwise, the remaining data in the compacted ledger will be skipped. + if (!readCompacted) { + super.rewind(); + } else { + readPosition = markDeletePosition.getNext(); + } + } + + @Override public synchronized String toString() { return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition) .add("readPos", readPosition).toString(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index 4c7ea45..8cab06b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; @@ -181,9 +180,6 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas consumer.notifyActiveConsumerChange(currentActiveConsumer); } } - if (cursor != null && !cursor.isDurable() && cursor instanceof NonDurableCursorImpl) { - ((NonDurableCursorImpl) cursor).setReadCompacted(ACTIVE_CONSUMER_UPDATER.get(this).readCompacted()); - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1b2e11f..e2cc79c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -745,7 +745,7 @@ public class PersistentTopic extends AbstractTopic getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, replicatedSubscriptionState) : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, - startMessageRollbackDurationSec); + startMessageRollbackDurationSec, readCompacted); int maxUnackedMessages = isDurable ? getMaxUnackedMessagesOnConsumer() @@ -891,7 +891,8 @@ public class PersistentTopic extends AbstractTopic } private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName, - MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec) { + MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec, + boolean isReadCompacted) { log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId); CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>(); @@ -924,7 +925,8 @@ public class PersistentTopic extends AbstractTopic Position startPosition = new PositionImpl(ledgerId, entryId); ManagedCursor cursor = null; try { - cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition); + cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition, + isReadCompacted); } catch (ManagedLedgerException e) { return FutureUtil.failedFuture(e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java index 4d00d28..accce4ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -734,4 +734,59 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS)); } + public void testReadCompleteMessagesDuringTopicUnloading() throws Exception { + String topic = "persistent://my-property/use/my-ns/testReadCompleteMessagesDuringTopicUnloading-" + + UUID.randomUUID(); + final int numMessages = 1000; + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .blockIfQueueFull(true) + .enableBatching(false) + .create(); + CompletableFuture<MessageId> lastMessage = null; + for (int i = 0; i < numMessages; ++i) { + lastMessage = producer.newMessage().key(i + "").value(String.format("msg [%d]", i)).sendAsync(); + } + producer.flush(); + lastMessage.join(); + admin.topics().triggerCompaction(topic); + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); + Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); + Assert.assertEquals(stats.compactedLedger.entries, numMessages); + Assert.assertEquals(admin.topics().getStats(topic) + .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); + Assert.assertEquals(stats.lastConfirmedEntry, stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition); + }); + // Unload the topic to make sure the original ledger been deleted. + admin.topics().unload(topic); + // Produce more messages to the original topic + for (int i = 0; i < numMessages; ++i) { + lastMessage = producer.newMessage().key(i + numMessages + "").value(String.format("msg [%d]", i + numMessages)).sendAsync(); + } + producer.flush(); + lastMessage.join(); + // For now the topic has 1000 messages in the compacted ledger and 1000 messages in the original topic. + @Cleanup + Reader<String> reader = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageIdInclusive() + .startMessageId(MessageId.earliest) + .readCompacted(true) + .create(); + + // Unloading the topic during reading the data to make sure the reader will not miss any messages. + for (int i = 0; i < numMessages / 2; ++i) { + Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i)); + } + admin.topics().unload(topic); + for (int i = 0; i < numMessages / 2; ++i) { + Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i + numMessages / 2)); + } + admin.topics().unload(topic); + for (int i = 0; i < numMessages; ++i) { + Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i + numMessages)); + } + } } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java index 767190c..229cd66 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java @@ -122,8 +122,8 @@ public class MockManagedLedger implements ManagedLedger { @Override public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, - CommandSubscribe.InitialPosition initialPosition) throws - ManagedLedgerException { + CommandSubscribe.InitialPosition initialPosition, + boolean isReadCompacted) throws ManagedLedgerException { return null; }
