This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ea40033a5e456b147496269892c3cd77b0236aed Author: lipenghui <[email protected]> AuthorDate: Fri Nov 5 00:55:59 2021 +0800 [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction (#12602) * [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction. For the non-durable cursor, the ledgers trimming task will cause skip the removed ledgers to avoid readers introduced backlogs and make sure the data can be removed if over the retention, more details to see #6787. But for a topic which enabled compaction, this will lead to the reader skips the compacted data. The new added test can illustrate this problem well. For reading compacted data, reading a message ID that earlier that the first message ID of the original data is a normal behavior, so we should not move forward the cursor which will read the compacted data. * Fix checkstyle. * Fix tests. * Fix tests. (cherry picked from commit a6b1b34a5c028b74bd44c5b8f32b42752b6cec14) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 +- .../mledger/impl/NonDurableCursorImpl.java | 10 +++ .../AbstractDispatcherSingleActiveConsumer.java | 10 ++- ...onPersistentDispatcherSingleActiveConsumer.java | 2 +- .../PersistentDispatcherSingleActiveConsumer.java | 4 +- .../pulsar/compaction/CompactedTopicTest.java | 73 ++++++++++++++++++++++ 6 files changed, 95 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 126acdb..fa62ebe 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2543,7 +2543,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0 - && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) { + && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 + && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) { cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 15b1f04..0f7ffe41 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory; public class NonDurableCursorImpl extends ManagedCursorImpl { + private volatile boolean readCompacted; + NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName, PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition) { super(bookkeeper, config, ledger, cursorName); @@ -116,6 +118,14 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { callback.deleteCursorComplete(ctx); } + public void setReadCompacted(boolean readCompacted) { + this.readCompacted = readCompacted; + } + + public boolean isReadCompacted() { + return readCompacted; + } + @Override public synchronized String toString() { return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index 690a598..4c7ea45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -26,6 +26,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; @@ -45,7 +47,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas protected boolean isKeyHashRangeFiltered = false; protected CompletableFuture<Void> closeFuture = null; protected final int partitionIndex; - + protected final ManagedCursor cursor; // This dispatcher supports both the Exclusive and Failover subscription types protected final SubType subscriptionType; @@ -59,12 +61,13 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex, String topicName, Subscription subscription, - ServiceConfiguration serviceConfig) { + ServiceConfiguration serviceConfig, ManagedCursor cursor) { super(subscription, serviceConfig); this.topicName = topicName; this.consumers = new CopyOnWriteArrayList<>(); this.partitionIndex = partitionIndex; this.subscriptionType = subscriptionType; + this.cursor = cursor; ACTIVE_CONSUMER_UPDATER.set(this, null); } @@ -178,6 +181,9 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas consumer.notifyActiveConsumerChange(currentActiveConsumer); } } + if (cursor != null && !cursor.isDurable() && cursor instanceof NonDurableCursorImpl) { + ((NonDurableCursorImpl) cursor).setReadCompacted(ACTIVE_CONSUMER_UPDATER.get(this).readCompacted()); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java index 6094ab7..5cdbff1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java @@ -44,7 +44,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex, NonPersistentTopic topic, Subscription subscription) { super(subscriptionType, partitionIndex, topic.getName(), subscription, - topic.getBrokerService().pulsar().getConfiguration()); + topic.getBrokerService().pulsar().getConfiguration(), null); this.topic = topic; this.subscription = subscription; this.msgDrop = new Rate(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 04b8f5a..1f6cbec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -58,7 +58,6 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher implements Dispatcher, ReadEntriesCallback { protected final PersistentTopic topic; - protected final ManagedCursor cursor; protected final String name; private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty(); @@ -73,11 +72,10 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex, PersistentTopic topic, Subscription subscription) { super(subscriptionType, partitionIndex, topic.getName(), subscription, - topic.getBrokerService().pulsar().getConfiguration()); + topic.getBrokerService().pulsar().getConfiguration(), cursor); this.topic = topic; this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName()) : ""/* NonDurableCursor doesn't have name */); - this.cursor = cursor; this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.readFailureBackoff = new Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java index 608d99b..cbe7372 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -509,4 +509,77 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { reader.close(); producer.close(); } + + @Test + public void testReadCompactedDataWhenLedgerRolloverKickIn() throws Exception { + String topic = "persistent://my-property/use/my-ns/testReadCompactedDataWhenLedgerRolloverKickIn-" + + UUID.randomUUID(); + final int numMessages = 2000; + final int keys = 200; + final String msg = "Test"; + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .blockIfQueueFull(true) + .maxPendingMessages(numMessages) + .enableBatching(false) + .create(); + CompletableFuture<MessageId> lastMessage = null; + for (int i = 0; i < numMessages; ++i) { + lastMessage = producer.newMessage().key(i % keys + "").value(msg).sendAsync(); + } + producer.flush(); + lastMessage.join(); + admin.topics().triggerCompaction(topic); + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); + Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); + Assert.assertEquals(stats.compactedLedger.entries, keys); + Assert.assertEquals(admin.topics().getStats(topic) + .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); + }); + // Send more 200 keys + for (int i = 0; i < numMessages; ++i) { + lastMessage = producer.newMessage().key((i % keys + keys) + "").value(msg).sendAsync(); + } + producer.flush(); + lastMessage.join(); + + // Make sure we have more than 1 original ledgers + admin.topics().unload(topic); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(admin.topics().getInternalStats(topic).ledgers.size(), 2); + }); + + // Start a new reader to reading messages + Reader<String> reader = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageId(MessageId.earliest) + .readCompacted(true) + .receiverQueueSize(10) + .create(); + + // Send more 200 keys + for (int i = 0; i < numMessages; ++i) { + lastMessage = producer.newMessage().key((i % keys + keys * 2) + "").value(msg).sendAsync(); + } + producer.flush(); + lastMessage.join(); + + admin.topics().triggerCompaction(topic); + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); + Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1); + Assert.assertEquals(stats.compactedLedger.entries, keys * 3); + Assert.assertEquals(admin.topics().getStats(topic) + .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0); + }); + + // The reader should read all 600 keys + int received = 0; + while (reader.hasMessageAvailable()) { + System.out.println(reader.readNext().getKey()); + received++; + } + Assert.assertEquals(received, keys * 3); + } }
