This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 39eea8a74e2 [Branch-2.7][Cherry-pick] Fix reader skipped remaining
compacted data during the topic unloading (#16300)
39eea8a74e2 is described below
commit 39eea8a74e2f7d7e35fa85cb363e46341422a79b
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Jul 5 17:52:54 2022 +0800
[Branch-2.7][Cherry-pick] Fix reader skipped remaining compacted data
during the topic unloading (#16300)
---
.../apache/bookkeeper/mledger/ManagedLedger.java | 3 ++-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 ++++---
.../mledger/impl/NonDurableCursorImpl.java | 24 ++++++++++++++++++++--
.../AbstractDispatcherSingleActiveConsumer.java | 10 +++++----
.../broker/service/persistent/PersistentTopic.java | 8 +++++---
5 files changed, 39 insertions(+), 13 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 4b06b5ee929..7fba1a42044 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -195,7 +195,8 @@ public interface ManagedLedger {
*/
ManagedCursor newNonDurableCursor(Position startCursorPosition) throws
ManagedLedgerException;
ManagedCursor newNonDurableCursor(Position startPosition, String
subscriptionName) throws ManagedLedgerException;
- ManagedCursor newNonDurableCursor(Position startPosition, String
subscriptionName, InitialPosition initialPosition) throws
ManagedLedgerException;
+ ManagedCursor newNonDurableCursor(Position startPosition, String
subscriptionName, InitialPosition initialPosition,
+ boolean isReadCompacted) throws
ManagedLedgerException;
/**
* Delete a ManagedCursor asynchronously.
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 78e00cc0836..db500e34b4a 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -908,11 +908,12 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public ManagedCursor newNonDurableCursor(Position startPosition, String
subscriptionName) throws ManagedLedgerException {
- return newNonDurableCursor(startPosition, subscriptionName,
InitialPosition.Latest);
+ return newNonDurableCursor(startPosition, subscriptionName,
InitialPosition.Latest, false);
}
@Override
- public ManagedCursor newNonDurableCursor(Position startCursorPosition,
String cursorName, InitialPosition initialPosition)
+ public ManagedCursor newNonDurableCursor(Position startCursorPosition,
String cursorName, InitialPosition initialPosition,
+ boolean isReadCompacted)
throws ManagedLedgerException {
Objects.requireNonNull(cursorName, "cursor name can't be null");
checkManagedLedgerIsOpen();
@@ -927,7 +928,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper,
config, this, cursorName,
- (PositionImpl) startCursorPosition, initialPosition);
+ (PositionImpl) startCursorPosition, initialPosition,
isReadCompacted);
cursor.setActive();
log.info("[{}] Opened new cursor: {}", name, cursor);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 167bcec4acf..28ed13d808c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -33,9 +33,13 @@ import org.slf4j.LoggerFactory;
public class NonDurableCursorImpl extends ManagedCursorImpl {
+ private final boolean readCompacted;
+
NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config,
ManagedLedgerImpl ledger, String cursorName,
- PositionImpl startCursorPosition,
PulsarApi.CommandSubscribe.InitialPosition initialPosition) {
+ PositionImpl startCursorPosition,
PulsarApi.CommandSubscribe.InitialPosition initialPosition,
+ boolean isReadCompacted) {
super(bookkeeper, config, ledger, cursorName);
+ this.readCompacted = isReadCompacted;
// Compare with "latest" position marker by using only the ledger id.
Since the C++ client is using 48bits to
// store the entryId, it's not able to pass a Long.max() as entryId.
In this case there's no point to require
@@ -65,7 +69,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
private void recoverCursor(PositionImpl mdPosition) {
Pair<PositionImpl, Long> lastEntryAndCounter =
ledger.getLastPositionAndCounter();
- this.readPosition = ledger.getNextValidPosition(mdPosition);
+ this.readPosition = isReadCompacted() ? mdPosition.getNext() :
ledger.getNextValidPosition(mdPosition);
markDeletePosition = mdPosition;
// Initialize the counter such that the difference between the
messages written on the ML and the
@@ -116,6 +120,22 @@ public class NonDurableCursorImpl extends
ManagedCursorImpl {
callback.deleteCursorComplete(ctx);
}
+ public boolean isReadCompacted() {
+ return readCompacted;
+ }
+
+ @Override
+ public void rewind() {
+ // For reading the compacted data,
+ // we couldn't reset the read position to the next valid position of
the original topic.
+ // Otherwise, the remaining data in the compacted ledger will be
skipped.
+ if (!readCompacted) {
+ super.rewind();
+ } else {
+ readPosition = markDeletePosition.getNext();
+ }
+ }
+
@Override
public synchronized String toString() {
return MoreObjects.toStringHelper(this).add("ledger",
ledger.getName()).add("ackPos", markDeletePosition)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 2ab83834d1e..5d486528f7f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -27,7 +27,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
import org.apache.pulsar.broker.ServiceConfiguration;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -38,8 +37,10 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractDispatcherSingleActiveConsumer extends
AbstractBaseDispatcher {
protected final String topicName;
- protected static final
AtomicReferenceFieldUpdater<AbstractDispatcherSingleActiveConsumer, Consumer>
ACTIVE_CONSUMER_UPDATER =
-
AtomicReferenceFieldUpdater.newUpdater(AbstractDispatcherSingleActiveConsumer.class,
Consumer.class, "activeConsumer");
+ protected static final
AtomicReferenceFieldUpdater<AbstractDispatcherSingleActiveConsumer, Consumer>
+ ACTIVE_CONSUMER_UPDATER =
+
AtomicReferenceFieldUpdater.newUpdater(AbstractDispatcherSingleActiveConsumer.class,
+ Consumer.class, "activeConsumer");
private volatile Consumer activeConsumer = null;
protected final CopyOnWriteArrayList<Consumer> consumers;
protected StickyKeyConsumerSelector stickyKeyConsumerSelector;
@@ -146,7 +147,8 @@ public abstract class
AbstractDispatcherSingleActiveConsumer extends AbstractBas
}
if (subscriptionType == SubType.Failover &&
isConsumersExceededOnSubscription()) {
- log.warn("[{}] Attempting to add consumer to subscription which
reached max consumers limit", this.topicName);
+ log.warn("[{}] Attempting to add consumer to subscription which
reached max consumers limit",
+ this.topicName);
throw new ConsumerBusyException("Subscription reached max
consumers limit");
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c850aa8d281..67f17d4a4c3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -662,7 +662,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
CompletableFuture<? extends Subscription> subscriptionFuture =
isDurable ? //
getDurableSubscription(subscriptionName, initialPosition,
startMessageRollbackDurationSec, replicatedSubscriptionState) //
- : getNonDurableSubscription(subscriptionName, startMessageId,
initialPosition, startMessageRollbackDurationSec);
+ : getNonDurableSubscription(subscriptionName, startMessageId,
initialPosition, startMessageRollbackDurationSec, readCompacted);
int maxUnackedMessages = isDurable
? getMaxUnackedMessagesOnConsumer()
@@ -776,7 +776,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
private CompletableFuture<? extends Subscription>
getNonDurableSubscription(String subscriptionName,
- MessageId startMessageId, InitialPosition initialPosition, long
startMessageRollbackDurationSec) {
+ MessageId startMessageId, InitialPosition initialPosition, long
startMessageRollbackDurationSec,
+ boolean isReadCompacted) {
log.info("[{}][{}] Creating non-durable subscription at msg id {}",
topic, subscriptionName, startMessageId);
CompletableFuture<Subscription> subscriptionFuture = new
CompletableFuture<>();
@@ -809,7 +810,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
Position startPosition = new PositionImpl(ledgerId, entryId);
ManagedCursor cursor = null;
try {
- cursor = ledger.newNonDurableCursor(startPosition,
subscriptionName, initialPosition);
+ cursor = ledger.newNonDurableCursor(startPosition,
subscriptionName, initialPosition,
+ isReadCompacted);
} catch (ManagedLedgerException e) {
return FutureUtil.failedFuture(e);
}