This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 23700dbe7eaf871a2160a47546dc3b03f51f3e1e
Author: lipenghui <[email protected]>
AuthorDate: Fri Jan 7 13:44:02 2022 +0800

    Fix reader skipped remaining compacted data during the topic unloading. 
(#13629)
    
    ### Motivation
    
    To fix the reader skipping remaining compacted data while the topic has 
been unloaded.
    #11287 fixed the data skipped issue while the reader first time to read the 
messages
    with the earliest position. But if the reader has consumed some messages 
from the
    compacted ledger but not all, the start position will not be `earliest`, 
the broker
    will rewind the cursor for the reader to the next valid position of the 
original topic.
    So the remaining messages in the compacted ledger will be skipped.
    
    Here are the logs from the broker:
    
    ```
    10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  
org.apache.pulsar.broker.service.BrokerService - Created topic 
persistent://xxx/product-full-prod/5126 - dedup is disabled
    10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  
org.apache.pulsar.broker.service.persistent.PersistentTopic - 
[persistent://xxx/product-full-prod/5126][xxx] Creating non-durable 
subscription at msg id 181759:14:-1:-1
    10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  
org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl - 
[xxx/product-full-prod/persistent/5126] Created non-durable cursor 
read-position=221199:0 mark-delete-position=181759:13
    10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - 
[xxx/product-full-prod/persistent/5126] Opened new cursor: 
NonDurableCursorImpl{ledger=xxx/product-full-prod/persistent/5126, 
ackPos=181759:13, readPos=221199:0}
    10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - 
[xxx/product-full-prod/persistent/5126-xxx] Rewind from 221199:0 to 221199:0
    ```
    
    There some many compacted messages after `181759:13`, but the broker will 
not dispatch them to the reader.
    The issue also can be reproduced by the unit test that was added in this PR.
    
    ### Modification
    
    If the cursor with `readCompacted = true`, just rewind to the next message 
of the mark delete position,
    so that the reader can continue to read the data from the compacted ledger.
    
    ### Verification
    
    A new test added for testing the reader can get all the compacted messages 
and non-compacted messages from the topic during the topic unloading.
    
    (cherry picked from commit 07f131fec7db36d6f424ce419d26888592b04207)
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  3 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  7 +--
 .../mledger/impl/NonDurableCursorImpl.java         | 23 ++++++---
 .../AbstractDispatcherSingleActiveConsumer.java    |  4 --
 .../broker/service/persistent/PersistentTopic.java |  8 ++--
 .../pulsar/compaction/CompactedTopicTest.java      | 55 ++++++++++++++++++++++
 .../offload/jcloud/impl/MockManagedLedger.java     |  4 +-
 7 files changed, 84 insertions(+), 20 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 74c67e9..0200e25 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -259,7 +259,8 @@ public interface ManagedLedger {
      */
     ManagedCursor newNonDurableCursor(Position startCursorPosition) throws 
ManagedLedgerException;
     ManagedCursor newNonDurableCursor(Position startPosition, String 
subscriptionName) throws ManagedLedgerException;
-    ManagedCursor newNonDurableCursor(Position startPosition, String 
subscriptionName, InitialPosition initialPosition) throws 
ManagedLedgerException;
+    ManagedCursor newNonDurableCursor(Position startPosition, String 
subscriptionName, InitialPosition initialPosition,
+                                      boolean isReadCompacted) throws 
ManagedLedgerException;
 
     /**
      * Delete a ManagedCursor asynchronously.
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index cec106c7..457600c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1030,11 +1030,12 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
 
     @Override
     public ManagedCursor newNonDurableCursor(Position startPosition, String 
subscriptionName) throws ManagedLedgerException {
-        return newNonDurableCursor(startPosition, subscriptionName, 
InitialPosition.Latest);
+        return newNonDurableCursor(startPosition, subscriptionName, 
InitialPosition.Latest, false);
     }
 
     @Override
-    public ManagedCursor newNonDurableCursor(Position startCursorPosition, 
String cursorName, InitialPosition initialPosition)
+    public ManagedCursor newNonDurableCursor(Position startCursorPosition, 
String cursorName, InitialPosition initialPosition,
+                                             boolean isReadCompacted)
             throws ManagedLedgerException {
         Objects.requireNonNull(cursorName, "cursor name can't be null");
         checkManagedLedgerIsOpen();
@@ -1049,7 +1050,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
 
         NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, 
config, this, cursorName,
-                (PositionImpl) startCursorPosition, initialPosition);
+                (PositionImpl) startCursorPosition, initialPosition, 
isReadCompacted);
         cursor.setActive();
 
         log.info("[{}] Opened new cursor: {}", name, cursor);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 0f7ffe41..1d545bd 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -33,11 +33,12 @@ import org.slf4j.LoggerFactory;
 
 public class NonDurableCursorImpl extends ManagedCursorImpl {
 
-    private volatile boolean readCompacted;
+    private final boolean readCompacted;
 
     NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, 
ManagedLedgerImpl ledger, String cursorName,
-                         PositionImpl startCursorPosition, 
CommandSubscribe.InitialPosition initialPosition) {
+                         PositionImpl startCursorPosition, 
CommandSubscribe.InitialPosition initialPosition, boolean isReadCompacted) {
         super(bookkeeper, config, ledger, cursorName);
+        this.readCompacted = isReadCompacted;
 
         // Compare with "latest" position marker by using only the ledger id. 
Since the C++ client is using 48bits to
         // store the entryId, it's not able to pass a Long.max() as entryId. 
In this case there's no point to require
@@ -67,7 +68,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
 
     private void recoverCursor(PositionImpl mdPosition) {
         Pair<PositionImpl, Long> lastEntryAndCounter = 
ledger.getLastPositionAndCounter();
-        this.readPosition = ledger.getNextValidPosition(mdPosition);
+        this.readPosition = isReadCompacted() ? mdPosition.getNext() : 
ledger.getNextValidPosition(mdPosition);
         markDeletePosition = mdPosition;
 
         // Initialize the counter such that the difference between the 
messages written on the ML and the
@@ -118,15 +119,23 @@ public class NonDurableCursorImpl extends 
ManagedCursorImpl {
         callback.deleteCursorComplete(ctx);
     }
 
-    public void setReadCompacted(boolean readCompacted) {
-        this.readCompacted = readCompacted;
-    }
-
     public boolean isReadCompacted() {
         return readCompacted;
     }
 
     @Override
+    public void rewind() {
+        // For reading the compacted data,
+        // we couldn't reset the read position to the next valid position of 
the original topic.
+        // Otherwise, the remaining data in the compacted ledger will be 
skipped.
+        if (!readCompacted) {
+            super.rewind();
+        } else {
+            readPosition = markDeletePosition.getNext();
+        }
+    }
+
+    @Override
     public synchronized String toString() {
         return MoreObjects.toStringHelper(this).add("ledger", 
ledger.getName()).add("ackPos", markDeletePosition)
                 .add("readPos", readPosition).toString();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 4c7ea45..8cab06b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -181,9 +180,6 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
                 consumer.notifyActiveConsumerChange(currentActiveConsumer);
             }
         }
-        if (cursor != null && !cursor.isDurable() && cursor instanceof 
NonDurableCursorImpl) {
-            ((NonDurableCursorImpl) 
cursor).setReadCompacted(ACTIVE_CONSUMER_UPDATER.get(this).readCompacted());
-        }
 
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1b2e11f..e2cc79c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -745,7 +745,7 @@ public class PersistentTopic extends AbstractTopic
                     getDurableSubscription(subscriptionName, initialPosition, 
startMessageRollbackDurationSec,
                             replicatedSubscriptionState)
                     : getNonDurableSubscription(subscriptionName, 
startMessageId, initialPosition,
-                    startMessageRollbackDurationSec);
+                    startMessageRollbackDurationSec, readCompacted);
 
             int maxUnackedMessages = isDurable
                     ? getMaxUnackedMessagesOnConsumer()
@@ -891,7 +891,8 @@ public class PersistentTopic extends AbstractTopic
     }
 
     private CompletableFuture<? extends Subscription> 
getNonDurableSubscription(String subscriptionName,
-            MessageId startMessageId, InitialPosition initialPosition, long 
startMessageRollbackDurationSec) {
+            MessageId startMessageId, InitialPosition initialPosition, long 
startMessageRollbackDurationSec,
+            boolean isReadCompacted) {
         log.info("[{}][{}] Creating non-durable subscription at msg id {}", 
topic, subscriptionName, startMessageId);
 
         CompletableFuture<Subscription> subscriptionFuture = new 
CompletableFuture<>();
@@ -924,7 +925,8 @@ public class PersistentTopic extends AbstractTopic
                 Position startPosition = new PositionImpl(ledgerId, entryId);
                 ManagedCursor cursor = null;
                 try {
-                    cursor = ledger.newNonDurableCursor(startPosition, 
subscriptionName, initialPosition);
+                    cursor = ledger.newNonDurableCursor(startPosition, 
subscriptionName, initialPosition,
+                            isReadCompacted);
                 } catch (ManagedLedgerException e) {
                     return FutureUtil.failedFuture(e);
                 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 4d00d28..accce4ef 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -734,4 +734,59 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
     }
 
+    public void testReadCompleteMessagesDuringTopicUnloading() throws 
Exception {
+        String topic = 
"persistent://my-property/use/my-ns/testReadCompleteMessagesDuringTopicUnloading-"
 +
+                UUID.randomUUID();
+        final int numMessages = 1000;
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .enableBatching(false)
+                .create();
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i + 
"").value(String.format("msg [%d]", i)).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, numMessages);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+            Assert.assertEquals(stats.lastConfirmedEntry, 
stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition);
+        });
+        // Unload the topic to make sure the original ledger been deleted.
+        admin.topics().unload(topic);
+        // Produce more messages to the original topic
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i + numMessages + 
"").value(String.format("msg [%d]", i + numMessages)).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        // For now the topic has 1000 messages in the compacted ledger and 
1000 messages in the original topic.
+        @Cleanup
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageIdInclusive()
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .create();
+
+        // Unloading the topic during reading the data to make sure the reader 
will not miss any messages.
+        for (int i = 0; i < numMessages / 2; ++i) {
+            Assert.assertEquals(reader.readNext().getValue(), 
String.format("msg [%d]", i));
+        }
+        admin.topics().unload(topic);
+        for (int i = 0; i < numMessages / 2; ++i) {
+            Assert.assertEquals(reader.readNext().getValue(), 
String.format("msg [%d]", i + numMessages / 2));
+        }
+        admin.topics().unload(topic);
+        for (int i = 0; i < numMessages; ++i) {
+            Assert.assertEquals(reader.readNext().getValue(), 
String.format("msg [%d]", i + numMessages));
+        }
+    }
 }
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 767190c..229cd66 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -122,8 +122,8 @@ public class MockManagedLedger implements ManagedLedger {
 
     @Override
     public ManagedCursor newNonDurableCursor(Position startPosition, String 
subscriptionName,
-                                             CommandSubscribe.InitialPosition 
initialPosition) throws
-            ManagedLedgerException {
+                                             CommandSubscribe.InitialPosition 
initialPosition,
+                                             boolean isReadCompacted) throws 
ManagedLedgerException {
         return null;
     }
 

Reply via email to