This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ea40033a5e456b147496269892c3cd77b0236aed
Author: lipenghui <[email protected]>
AuthorDate: Fri Nov 5 00:55:59 2021 +0800

    [Compaction] Do not move the non-durable cursor position when trimming 
ledgers while topic with compaction (#12602)
    
    * [Compaction] Do not move the non-durable cursor position when trimming 
ledgers while topic with compaction.
    
    For the non-durable cursor, the ledgers trimming task will cause skip the 
removed ledgers
    to avoid readers introduced backlogs and make sure the data can be removed 
if over the retention,
    more details to see #6787.
    
    But for a topic which enabled compaction, this will lead to the reader 
skips the compacted data.
    The new added test can illustrate this problem well. For reading compacted 
data, reading a message ID
    that earlier that the first message ID of the original data is a normal 
behavior, so we should not
    move forward the cursor which will read the compacted data.
    
    * Fix checkstyle.
    
    * Fix tests.
    
    * Fix tests.
    
    (cherry picked from commit a6b1b34a5c028b74bd44c5b8f32b42752b6cec14)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 +-
 .../mledger/impl/NonDurableCursorImpl.java         | 10 +++
 .../AbstractDispatcherSingleActiveConsumer.java    | 10 ++-
 ...onPersistentDispatcherSingleActiveConsumer.java |  2 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  4 +-
 .../pulsar/compaction/CompactedTopicTest.java      | 73 ++++++++++++++++++++++
 6 files changed, 95 insertions(+), 7 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 126acdb..fa62ebe 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2543,7 +2543,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             // move the mark delete position to the highestPositionToDelete 
only if it is smaller than the add confirmed
             // to prevent the edge case where the cursor is caught up to the 
latest and highestPositionToDelete may be larger than the last add confirmed
             if (highestPositionToDelete.compareTo((PositionImpl) 
cursor.getMarkDeletedPosition()) > 0
-                    && highestPositionToDelete.compareTo((PositionImpl) 
cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) {
+                    && highestPositionToDelete.compareTo((PositionImpl) 
cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
+                    && !(!cursor.isDurable() && cursor instanceof 
NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
                 cursor.asyncMarkDelete(highestPositionToDelete, new 
MarkDeleteCallback() {
                     @Override
                     public void markDeleteComplete(Object ctx) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 15b1f04..0f7ffe41 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 
 public class NonDurableCursorImpl extends ManagedCursorImpl {
 
+    private volatile boolean readCompacted;
+
     NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, 
ManagedLedgerImpl ledger, String cursorName,
                          PositionImpl startCursorPosition, 
CommandSubscribe.InitialPosition initialPosition) {
         super(bookkeeper, config, ledger, cursorName);
@@ -116,6 +118,14 @@ public class NonDurableCursorImpl extends 
ManagedCursorImpl {
         callback.deleteCursorComplete(ctx);
     }
 
+    public void setReadCompacted(boolean readCompacted) {
+        this.readCompacted = readCompacted;
+    }
+
+    public boolean isReadCompacted() {
+        return readCompacted;
+    }
+
     @Override
     public synchronized String toString() {
         return MoreObjects.toStringHelper(this).add("ledger", 
ledger.getName()).add("ackPos", markDeletePosition)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 690a598..4c7ea45 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -26,6 +26,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -45,7 +47,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer 
extends AbstractBas
     protected boolean isKeyHashRangeFiltered = false;
     protected CompletableFuture<Void> closeFuture = null;
     protected final int partitionIndex;
-
+    protected final ManagedCursor cursor;
     // This dispatcher supports both the Exclusive and Failover subscription 
types
     protected final SubType subscriptionType;
 
@@ -59,12 +61,13 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
 
     public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, 
int partitionIndex,
                                                   String topicName, 
Subscription subscription,
-                                                  ServiceConfiguration 
serviceConfig) {
+                                                  ServiceConfiguration 
serviceConfig, ManagedCursor cursor) {
         super(subscription, serviceConfig);
         this.topicName = topicName;
         this.consumers = new CopyOnWriteArrayList<>();
         this.partitionIndex = partitionIndex;
         this.subscriptionType = subscriptionType;
+        this.cursor = cursor;
         ACTIVE_CONSUMER_UPDATER.set(this, null);
     }
 
@@ -178,6 +181,9 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
                 consumer.notifyActiveConsumerChange(currentActiveConsumer);
             }
         }
+        if (cursor != null && !cursor.isDurable() && cursor instanceof 
NonDurableCursorImpl) {
+            ((NonDurableCursorImpl) 
cursor).setReadCompacted(ACTIVE_CONSUMER_UPDATER.get(this).readCompacted());
+        }
 
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 6094ab7..5cdbff1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -44,7 +44,7 @@ public final class 
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     public NonPersistentDispatcherSingleActiveConsumer(SubType 
subscriptionType, int partitionIndex,
                                                        NonPersistentTopic 
topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName(), subscription,
-                topic.getBrokerService().pulsar().getConfiguration());
+                topic.getBrokerService().pulsar().getConfiguration(), null);
         this.topic = topic;
         this.subscription = subscription;
         this.msgDrop = new Rate();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 04b8f5a..1f6cbec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -58,7 +58,6 @@ public class PersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcher
         implements Dispatcher, ReadEntriesCallback {
 
     protected final PersistentTopic topic;
-    protected final ManagedCursor cursor;
     protected final String name;
     private Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
 
@@ -73,11 +72,10 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
     public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, 
SubType subscriptionType, int partitionIndex,
                                                     PersistentTopic topic, 
Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName(), subscription,
-                topic.getBrokerService().pulsar().getConfiguration());
+                topic.getBrokerService().pulsar().getConfiguration(), cursor);
         this.topic = topic;
         this.name = topic.getName() + " / " + (cursor.getName() != null ? 
Codec.decode(cursor.getName())
                 : ""/* NonDurableCursor doesn't have name */);
-        this.cursor = cursor;
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.readFailureBackoff = new 
Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(),
             TimeUnit.MILLISECONDS, 
serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 608d99b..cbe7372 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -509,4 +509,77 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
         reader.close();
         producer.close();
     }
+
+    @Test
+    public void testReadCompactedDataWhenLedgerRolloverKickIn() throws 
Exception {
+        String topic = 
"persistent://my-property/use/my-ns/testReadCompactedDataWhenLedgerRolloverKickIn-"
 +
+                UUID.randomUUID();
+        final int numMessages = 2000;
+        final int keys = 200;
+        final String msg = "Test";
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(numMessages)
+                .enableBatching(false)
+                .create();
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i % keys + 
"").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, keys);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+        // Send more 200 keys
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key((i % keys + keys) + 
"").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+
+        // Make sure we have more than 1 original ledgers
+        admin.topics().unload(topic);
+        Awaitility.await().untilAsserted(() -> {
+            
Assert.assertEquals(admin.topics().getInternalStats(topic).ledgers.size(), 2);
+        });
+
+        // Start a new reader to reading messages
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .receiverQueueSize(10)
+                .create();
+
+        // Send more 200 keys
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key((i % keys + keys * 2) + 
"").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, keys * 3);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+
+        // The reader should read all 600 keys
+        int received = 0;
+        while (reader.hasMessageAvailable()) {
+            System.out.println(reader.readNext().getKey());
+            received++;
+        }
+        Assert.assertEquals(received, keys * 3);
+    }
 }

Reply via email to