This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 39eea8a74e2 [Branch-2.7][Cherry-pick] Fix reader skipped remaining 
compacted data during the topic unloading (#16300)
39eea8a74e2 is described below

commit 39eea8a74e2f7d7e35fa85cb363e46341422a79b
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Jul 5 17:52:54 2022 +0800

    [Branch-2.7][Cherry-pick] Fix reader skipped remaining compacted data 
during the topic unloading (#16300)
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  3 ++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  7 ++++---
 .../mledger/impl/NonDurableCursorImpl.java         | 24 ++++++++++++++++++++--
 .../AbstractDispatcherSingleActiveConsumer.java    | 10 +++++----
 .../broker/service/persistent/PersistentTopic.java |  8 +++++---
 5 files changed, 39 insertions(+), 13 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 4b06b5ee929..7fba1a42044 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -195,7 +195,8 @@ public interface ManagedLedger {
      */
     ManagedCursor newNonDurableCursor(Position startCursorPosition) throws 
ManagedLedgerException;
     ManagedCursor newNonDurableCursor(Position startPosition, String 
subscriptionName) throws ManagedLedgerException;
-    ManagedCursor newNonDurableCursor(Position startPosition, String 
subscriptionName, InitialPosition initialPosition) throws 
ManagedLedgerException;
+    ManagedCursor newNonDurableCursor(Position startPosition, String 
subscriptionName, InitialPosition initialPosition,
+                                      boolean isReadCompacted) throws 
ManagedLedgerException;
 
     /**
      * Delete a ManagedCursor asynchronously.
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 78e00cc0836..db500e34b4a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -908,11 +908,12 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     @Override
     public ManagedCursor newNonDurableCursor(Position startPosition, String 
subscriptionName) throws ManagedLedgerException {
-        return newNonDurableCursor(startPosition, subscriptionName, 
InitialPosition.Latest);
+        return newNonDurableCursor(startPosition, subscriptionName, 
InitialPosition.Latest, false);
     }
 
     @Override
-    public ManagedCursor newNonDurableCursor(Position startCursorPosition, 
String cursorName, InitialPosition initialPosition)
+    public ManagedCursor newNonDurableCursor(Position startCursorPosition, 
String cursorName, InitialPosition initialPosition,
+                                             boolean isReadCompacted)
             throws ManagedLedgerException {
         Objects.requireNonNull(cursorName, "cursor name can't be null");
         checkManagedLedgerIsOpen();
@@ -927,7 +928,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
 
         NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, 
config, this, cursorName,
-                (PositionImpl) startCursorPosition, initialPosition);
+                (PositionImpl) startCursorPosition, initialPosition, 
isReadCompacted);
         cursor.setActive();
 
         log.info("[{}] Opened new cursor: {}", name, cursor);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 167bcec4acf..28ed13d808c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -33,9 +33,13 @@ import org.slf4j.LoggerFactory;
 
 public class NonDurableCursorImpl extends ManagedCursorImpl {
 
+    private final boolean readCompacted;
+
     NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, 
ManagedLedgerImpl ledger, String cursorName,
-                         PositionImpl startCursorPosition, 
PulsarApi.CommandSubscribe.InitialPosition initialPosition) {
+                         PositionImpl startCursorPosition, 
PulsarApi.CommandSubscribe.InitialPosition initialPosition,
+                         boolean isReadCompacted) {
         super(bookkeeper, config, ledger, cursorName);
+        this.readCompacted = isReadCompacted;
 
         // Compare with "latest" position marker by using only the ledger id. 
Since the C++ client is using 48bits to
         // store the entryId, it's not able to pass a Long.max() as entryId. 
In this case there's no point to require
@@ -65,7 +69,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
 
     private void recoverCursor(PositionImpl mdPosition) {
         Pair<PositionImpl, Long> lastEntryAndCounter = 
ledger.getLastPositionAndCounter();
-        this.readPosition = ledger.getNextValidPosition(mdPosition);
+        this.readPosition = isReadCompacted() ? mdPosition.getNext() : 
ledger.getNextValidPosition(mdPosition);
         markDeletePosition = mdPosition;
 
         // Initialize the counter such that the difference between the 
messages written on the ML and the
@@ -116,6 +120,22 @@ public class NonDurableCursorImpl extends 
ManagedCursorImpl {
         callback.deleteCursorComplete(ctx);
     }
 
+    public boolean isReadCompacted() {
+        return readCompacted;
+    }
+
+    @Override
+    public void rewind() {
+        // For reading the compacted data,
+        // we couldn't reset the read position to the next valid position of 
the original topic.
+        // Otherwise, the remaining data in the compacted ledger will be 
skipped.
+        if (!readCompacted) {
+            super.rewind();
+        } else {
+            readPosition = markDeletePosition.getNext();
+        }
+    }
+
     @Override
     public synchronized String toString() {
         return MoreObjects.toStringHelper(this).add("ledger", 
ledger.getName()).add("ackPos", markDeletePosition)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 2ab83834d1e..5d486528f7f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -27,7 +27,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
 import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -38,8 +37,10 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractDispatcherSingleActiveConsumer extends 
AbstractBaseDispatcher {
 
     protected final String topicName;
-    protected static final 
AtomicReferenceFieldUpdater<AbstractDispatcherSingleActiveConsumer, Consumer> 
ACTIVE_CONSUMER_UPDATER =
-            
AtomicReferenceFieldUpdater.newUpdater(AbstractDispatcherSingleActiveConsumer.class,
 Consumer.class, "activeConsumer");
+    protected static final 
AtomicReferenceFieldUpdater<AbstractDispatcherSingleActiveConsumer, Consumer>
+            ACTIVE_CONSUMER_UPDATER =
+            
AtomicReferenceFieldUpdater.newUpdater(AbstractDispatcherSingleActiveConsumer.class,
+                    Consumer.class, "activeConsumer");
     private volatile Consumer activeConsumer = null;
     protected final CopyOnWriteArrayList<Consumer> consumers;
     protected StickyKeyConsumerSelector stickyKeyConsumerSelector;
@@ -146,7 +147,8 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
         }
 
         if (subscriptionType == SubType.Failover && 
isConsumersExceededOnSubscription()) {
-            log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit", this.topicName);
+            log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit",
+                    this.topicName);
             throw new ConsumerBusyException("Subscription reached max 
consumers limit");
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c850aa8d281..67f17d4a4c3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -662,7 +662,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
         CompletableFuture<? extends Subscription> subscriptionFuture = 
isDurable ? //
                 getDurableSubscription(subscriptionName, initialPosition, 
startMessageRollbackDurationSec, replicatedSubscriptionState) //
-                : getNonDurableSubscription(subscriptionName, startMessageId, 
initialPosition, startMessageRollbackDurationSec);
+                : getNonDurableSubscription(subscriptionName, startMessageId, 
initialPosition, startMessageRollbackDurationSec, readCompacted);
 
         int maxUnackedMessages = isDurable
                 ? getMaxUnackedMessagesOnConsumer()
@@ -776,7 +776,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     }
 
     private CompletableFuture<? extends Subscription> 
getNonDurableSubscription(String subscriptionName,
-            MessageId startMessageId, InitialPosition initialPosition, long 
startMessageRollbackDurationSec) {
+            MessageId startMessageId, InitialPosition initialPosition, long 
startMessageRollbackDurationSec,
+            boolean isReadCompacted) {
         log.info("[{}][{}] Creating non-durable subscription at msg id {}", 
topic, subscriptionName, startMessageId);
 
         CompletableFuture<Subscription> subscriptionFuture = new 
CompletableFuture<>();
@@ -809,7 +810,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 Position startPosition = new PositionImpl(ledgerId, entryId);
                 ManagedCursor cursor = null;
                 try {
-                    cursor = ledger.newNonDurableCursor(startPosition, 
subscriptionName, initialPosition);
+                    cursor = ledger.newNonDurableCursor(startPosition, 
subscriptionName, initialPosition,
+                            isReadCompacted);
                 } catch (ManagedLedgerException e) {
                     return FutureUtil.failedFuture(e);
                 }

Reply via email to