This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 92ccc406c7e [fix][broker] Fix cursor should use latest ledger config
(#22644)
92ccc406c7e is described below
commit 92ccc406c7ee864588a90200fb3d61a516bc0597
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri May 10 10:37:44 2024 +0800
[fix][broker] Fix cursor should use latest ledger config (#22644)
Signed-off-by: Zixuan Liu <[email protected]>
(cherry picked from commit 774a5d42e8342ee50395cf3626b9e7af27da849e)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 61 +++++++++++-----------
.../mledger/impl/ManagedCursorMXBeanImpl.java | 3 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 8 +--
.../mledger/impl/NonDurableCursorImpl.java | 5 +-
.../bookkeeper/mledger/impl/OpReadEntry.java | 3 +-
.../bookkeeper/mledger/impl/RangeSetWrapper.java | 2 +-
.../mledger/impl/ReadOnlyCursorImpl.java | 5 +-
.../mledger/impl/ReadOnlyManagedLedgerImpl.java | 2 +-
...ManagedCursorIndividualDeletedMessagesTest.java | 3 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 7 ++-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +-
.../broker/service/BrokerBkEnsemblesTests.java | 8 +--
.../service/persistent/PersistentTopicTest.java | 25 +++++++++
13 files changed, 77 insertions(+), 57 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 3671385e60f..35000361eca 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -119,7 +119,6 @@ public class ManagedCursorImpl implements ManagedCursor {
return 0;
};
protected final BookKeeper bookkeeper;
- protected final ManagedLedgerConfig config;
protected final ManagedLedgerImpl ledger;
private final String name;
@@ -299,31 +298,30 @@ public class ManagedCursorImpl implements ManagedCursor {
void operationFailed(ManagedLedgerException exception);
}
- ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config,
ManagedLedgerImpl ledger, String cursorName) {
+ ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String
cursorName) {
this.bookkeeper = bookkeeper;
this.cursorProperties = Collections.emptyMap();
- this.config = config;
this.ledger = ledger;
this.name = cursorName;
this.individualDeletedMessages = new
RangeSetWrapper<>(positionRangeConverter,
positionRangeReverseConverter, this);
- if (config.isDeletionAtBatchIndexLevelEnabled()) {
+ if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
this.batchDeletedIndexes = new ConcurrentSkipListMap<>();
} else {
this.batchDeletedIndexes = null;
}
- this.digestType =
BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
+ this.digestType =
BookKeeper.DigestType.fromApiDigestType(getConfig().getDigestType());
STATE_UPDATER.set(this, State.Uninitialized);
PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0);
PENDING_READ_OPS_UPDATER.set(this, 0);
RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
WAITING_READ_OP_UPDATER.set(this, null);
- this.clock = config.getClock();
+ this.clock = getConfig().getClock();
this.lastActive = this.clock.millis();
this.lastLedgerSwitchTimestamp = this.clock.millis();
- if (config.getThrottleMarkDelete() > 0.0) {
- markDeleteLimiter =
RateLimiter.create(config.getThrottleMarkDelete());
+ if (getConfig().getThrottleMarkDelete() > 0.0) {
+ markDeleteLimiter =
RateLimiter.create(getConfig().getThrottleMarkDelete());
} else {
// Disable mark-delete rate limiter
markDeleteLimiter = null;
@@ -343,7 +341,7 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public boolean isCursorDataFullyPersistable() {
- return individualDeletedMessages.size() <=
config.getMaxUnackedRangesToPersist();
+ return individualDeletedMessages.size() <=
getConfig().getMaxUnackedRangesToPersist();
}
@Override
@@ -607,7 +605,7 @@ public class ManagedCursorImpl implements ManagedCursor {
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
}
- if (config.isDeletionAtBatchIndexLevelEnabled()
+ if (getConfig().isDeletionAtBatchIndexLevelEnabled()
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() >
0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
@@ -616,7 +614,8 @@ public class ManagedCursorImpl implements ManagedCursor {
}, null);
};
try {
- bookkeeper.asyncOpenLedger(ledgerId, digestType,
config.getPassword(), openCallback, null);
+ bookkeeper.asyncOpenLedger(ledgerId, digestType,
getConfig().getPassword(), openCallback,
+ null);
} catch (Throwable t) {
log.error("[{}] Encountered error on opening cursor ledger {} for
cursor {}",
ledger.getName(), ledgerId, name, t);
@@ -973,10 +972,10 @@ public class ManagedCursorImpl implements ManagedCursor {
// Check again for new entries after the configured time, then if
still no entries are available register
// to be notified
- if (config.getNewEntriesCheckDelayInMillis() > 0) {
+ if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
ledger.getScheduledExecutor()
.schedule(() -> checkForNewEntries(op, callback, ctx),
- config.getNewEntriesCheckDelayInMillis(),
TimeUnit.MILLISECONDS);
+ getConfig().getNewEntriesCheckDelayInMillis(),
TimeUnit.MILLISECONDS);
} else {
// If there's no delay, check directly from the same thread
checkForNewEntries(op, callback, ctx);
@@ -1324,7 +1323,7 @@ public class ManagedCursorImpl implements ManagedCursor {
lastMarkDeleteEntry = new
MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
? getProperties() : Collections.emptyMap(), null,
null);
individualDeletedMessages.clear();
- if (config.isDeletionAtBatchIndexLevelEnabled()) {
+ if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
batchDeletedIndexes.clear();
long[] resetWords = newReadPosition.ackSet;
@@ -1583,7 +1582,7 @@ public class ManagedCursorImpl implements ManagedCursor {
lock.readLock().lock();
try {
- if (config.isUnackedRangesOpenCacheSetEnabled()) {
+ if (getConfig().isUnackedRangesOpenCacheSetEnabled()) {
int cardinality = individualDeletedMessages.cardinality(
range.lowerEndpoint().ledgerId,
range.lowerEndpoint().entryId,
range.upperEndpoint().ledgerId,
range.upperEndpoint().entryId);
@@ -1963,7 +1962,7 @@ public class ManagedCursorImpl implements ManagedCursor {
PositionImpl newPosition = (PositionImpl) position;
- if (config.isDeletionAtBatchIndexLevelEnabled()) {
+ if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
if (newPosition.ackSet != null) {
AtomicReference<BitSetRecyclable> bitSetRecyclable = new
AtomicReference<>();
BitSetRecyclable givenBitSet =
BitSetRecyclable.create().resetWords(newPosition.ackSet);
@@ -2146,7 +2145,7 @@ public class ManagedCursorImpl implements ManagedCursor {
try {
individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId());
- if (config.isDeletionAtBatchIndexLevelEnabled()) {
+ if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
Map<PositionImpl, BitSetRecyclable> subMap =
batchDeletedIndexes.subMap(PositionImpl.EARLIEST,
false,
PositionImpl.get(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId()), true);
@@ -2284,7 +2283,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
if (isMessageDeleted(position)) {
- if (config.isDeletionAtBatchIndexLevelEnabled()) {
+ if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable =
batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
@@ -2296,7 +2295,7 @@ public class ManagedCursorImpl implements ManagedCursor {
continue;
}
if (position.ackSet == null) {
- if (config.isDeletionAtBatchIndexLevelEnabled()) {
+ if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable =
batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
@@ -2313,7 +2312,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.debug("[{}] [{}] Individually deleted messages:
{}", ledger.getName(), name,
individualDeletedMessages);
}
- } else if (config.isDeletionAtBatchIndexLevelEnabled()) {
+ } else if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable givenBitSet =
BitSetRecyclable.create().resetWords(position.ackSet);
BitSetRecyclable bitSet =
batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet);
if (givenBitSet != bitSet) {
@@ -2660,8 +2659,8 @@ public class ManagedCursorImpl implements ManagedCursor {
private boolean shouldPersistUnackRangesToLedger() {
return cursorLedger != null
&& !isCursorLedgerReadOnly
- && config.getMaxUnackedRangesToPersist() > 0
- && individualDeletedMessages.size() >
config.getMaxUnackedRangesToPersistInMetadataStore();
+ && getConfig().getMaxUnackedRangesToPersist() > 0
+ && individualDeletedMessages.size() >
getConfig().getMaxUnackedRangesToPersistInMetadataStore();
}
private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl
position, Map<String, Long> properties,
@@ -2686,7 +2685,7 @@ public class ManagedCursorImpl implements ManagedCursor {
info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties));
if (persistIndividualDeletedMessageRanges) {
info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
- if (config.isDeletionAtBatchIndexLevelEnabled()) {
+ if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
info.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList());
}
}
@@ -2951,7 +2950,7 @@ public class ManagedCursorImpl implements ManagedCursor {
private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
- ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx)
-> {
+ ledger.asyncCreateLedger(bookkeeper, getConfig(), digestType, (rc, lh,
ctx) -> {
if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
future.complete(null);
@@ -3056,7 +3055,7 @@ public class ManagedCursorImpl implements ManagedCursor {
acksSerializedSize.addAndGet(messageRange.getSerializedSize());
rangeList.add(messageRange);
- return rangeList.size() <=
config.getMaxUnackedRangesToPersist();
+ return rangeList.size() <=
getConfig().getMaxUnackedRangesToPersist();
});
this.individualDeletedMessagesSerializedSize =
acksSerializedSize.get();
@@ -3070,7 +3069,7 @@ public class ManagedCursorImpl implements ManagedCursor {
private List<MLDataFormats.BatchedEntryDeletionIndexInfo>
buildBatchEntryDeletionIndexInfoList() {
lock.readLock().lock();
try {
- if (!config.isDeletionAtBatchIndexLevelEnabled() ||
batchDeletedIndexes.isEmpty()) {
+ if (!getConfig().isDeletionAtBatchIndexLevelEnabled() ||
batchDeletedIndexes.isEmpty()) {
return Collections.emptyList();
}
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder =
MLDataFormats.NestedPositionInfo
@@ -3079,7 +3078,7 @@ public class ManagedCursorImpl implements ManagedCursor {
.BatchedEntryDeletionIndexInfo.newBuilder();
List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = new
ArrayList<>();
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator =
batchDeletedIndexes.entrySet().iterator();
- while (iterator.hasNext() && result.size() <
config.getMaxBatchDeletedIndexToPersist()) {
+ while (iterator.hasNext() && result.size() <
getConfig().getMaxBatchDeletedIndexToPersist()) {
Map.Entry<PositionImpl, BitSetRecyclable> entry =
iterator.next();
nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId());
nestedPositionBuilder.setEntryId(entry.getKey().getEntryId());
@@ -3199,8 +3198,8 @@ public class ManagedCursorImpl implements ManagedCursor {
boolean shouldCloseLedger(LedgerHandle lh) {
long now = clock.millis();
if (ledger.getFactory().isMetadataServiceAvailable()
- && (lh.getLastAddConfirmed() >=
config.getMetadataMaxEntriesPerLedger()
- || lastLedgerSwitchTimestamp < (now -
config.getLedgerRolloverTimeout() * 1000))
+ && (lh.getLastAddConfirmed() >=
getConfig().getMetadataMaxEntriesPerLedger()
+ || lastLedgerSwitchTimestamp < (now -
getConfig().getLedgerRolloverTimeout() * 1000))
&& (STATE_UPDATER.get(this) != State.Closed &&
STATE_UPDATER.get(this) != State.Closing)) {
// It's safe to modify the timestamp since this method will be
only called from a callback, implying that
// calls will be serialized on one single thread
@@ -3556,7 +3555,7 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
- if (config.isDeletionAtBatchIndexLevelEnabled()) {
+ if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSet = batchDeletedIndexes.get(position);
return bitSet == null ? null : bitSet.toLongArray();
} else {
@@ -3657,7 +3656,7 @@ public class ManagedCursorImpl implements ManagedCursor {
private static final Logger log =
LoggerFactory.getLogger(ManagedCursorImpl.class);
public ManagedLedgerConfig getConfig() {
- return config;
+ return getManagedLedger().getConfig();
}
/***
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
index 48465e6294b..a183c0d61ce 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
@@ -90,7 +90,8 @@ public class ManagedCursorMXBeanImpl implements
ManagedCursorMXBean {
@Override
public void addWriteCursorLedgerSize(final long size) {
- writeCursorLedgerSize.add(size * ((ManagedCursorImpl)
managedCursor).config.getWriteQuorumSize());
+ writeCursorLedgerSize.add(
+ size *
managedCursor.getManagedLedger().getConfig().getWriteQuorumSize());
writeCursorLedgerLogicalSize.add(size);
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e5e163127f7..07d0d40569a 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -578,7 +578,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
for (final String cursorName : consumers) {
log.info("[{}] Loading cursor {}", name, cursorName);
final ManagedCursorImpl cursor;
- cursor = new ManagedCursorImpl(bookKeeper, config,
ManagedLedgerImpl.this, cursorName);
+ cursor = new ManagedCursorImpl(bookKeeper,
ManagedLedgerImpl.this, cursorName);
cursor.recover(new VoidCallback() {
@Override
@@ -609,7 +609,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
log.debug("[{}] Recovering cursor {} lazily",
name, cursorName);
}
final ManagedCursorImpl cursor;
- cursor = new ManagedCursorImpl(bookKeeper, config,
ManagedLedgerImpl.this, cursorName);
+ cursor = new ManagedCursorImpl(bookKeeper,
ManagedLedgerImpl.this, cursorName);
CompletableFuture<ManagedCursor> cursorRecoveryFuture
= new CompletableFuture<>();
uninitializedCursors.put(cursorName,
cursorRecoveryFuture);
@@ -991,7 +991,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (log.isDebugEnabled()) {
log.debug("[{}] Creating new cursor: {}", name, cursorName);
}
- final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper,
config, this, cursorName);
+ final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper,
this, cursorName);
CompletableFuture<ManagedCursor> cursorFuture = new
CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorFuture);
PositionImpl position = InitialPosition.Earliest == initialPosition ?
getFirstPosition() : getLastPosition();
@@ -1124,7 +1124,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return cachedCursor;
}
- NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper,
config, this, cursorName,
+ NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper,
this, cursorName,
(PositionImpl) startCursorPosition, initialPosition,
isReadCompacted);
cursor.setActive();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 77216ce2e45..734eab20bc5 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -25,7 +25,6 @@ import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
-import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.slf4j.Logger;
@@ -35,10 +34,10 @@ public class NonDurableCursorImpl extends ManagedCursorImpl
{
private final boolean readCompacted;
- NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config,
ManagedLedgerImpl ledger, String cursorName,
+ NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger,
String cursorName,
PositionImpl startCursorPosition,
CommandSubscribe.InitialPosition initialPosition,
boolean isReadCompacted) {
- super(bookkeeper, config, ledger, cursorName);
+ super(bookkeeper, ledger, cursorName);
this.readCompacted = isReadCompacted;
// Compare with "latest" position marker by using only the ledger id.
Since the C++ client is using 48bits to
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index a79ba3fb5e2..534ef3d76cb 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -111,7 +111,8 @@ class OpReadEntry implements ReadEntriesCallback {
callback.readEntriesComplete(entries, ctx);
recycle();
});
- } else if (cursor.config.isAutoSkipNonRecoverableData() && exception
instanceof NonRecoverableLedgerException) {
+ } else if (cursor.getConfig().isAutoSkipNonRecoverableData()
+ && exception instanceof NonRecoverableLedgerException) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}",
cursor.ledger.getName(), cursor.getName(),
readPosition, exception.getMessage());
final ManagedLedgerImpl ledger = (ManagedLedgerImpl)
cursor.getManagedLedger();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
index 02e43504482..f235ffc63ac 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
@@ -52,7 +52,7 @@ public class RangeSetWrapper<T extends Comparable<T>>
implements LongPairRangeSe
RangeBoundConsumer<T> rangeBoundConsumer,
ManagedCursorImpl managedCursor) {
requireNonNull(managedCursor);
- this.config = managedCursor.getConfig();
+ this.config = managedCursor.getManagedLedger().getConfig();
this.rangeConverter = rangeConverter;
this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled()
? new ConcurrentOpenLongPairRangeSet<>(4096, rangeConverter)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
index 1661613f07d..2461bcf780e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Range;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
-import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
@@ -31,9 +30,9 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
@Slf4j
public class ReadOnlyCursorImpl extends ManagedCursorImpl implements
ReadOnlyCursor {
- public ReadOnlyCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig
config, ManagedLedgerImpl ledger,
+ public ReadOnlyCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger,
PositionImpl startPosition, String cursorName) {
- super(bookkeeper, config, ledger, cursorName);
+ super(bookkeeper, ledger, cursorName);
if (startPosition.equals(PositionImpl.EARLIEST)) {
readPosition = ledger.getFirstPosition().getNext();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 707b71c9d9f..d8449635999 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -143,7 +143,7 @@ public class ReadOnlyManagedLedgerImpl extends
ManagedLedgerImpl {
}
}
- return new ReadOnlyCursorImpl(bookKeeper, config, this, startPosition,
"read-only-cursor");
+ return new ReadOnlyCursorImpl(bookKeeper, this, startPosition,
"read-only-cursor");
}
@Override
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java
index aa0d04783d9..864c25c6c43 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java
@@ -56,8 +56,9 @@ public class ManagedCursorIndividualDeletedMessagesTest {
ManagedLedgerImpl ledger = mock(ManagedLedgerImpl.class);
doReturn(ledgersInfo).when(ledger).getLedgersInfo();
+ doReturn(config).when(ledger).getConfig();
- ManagedCursorImpl cursor = spy(new ManagedCursorImpl(bookkeeper,
config, ledger, "test-cursor"));
+ ManagedCursorImpl cursor = spy(new ManagedCursorImpl(bookkeeper,
ledger, "test-cursor"));
LongPairRangeSet<PositionImpl> deletedMessages =
cursor.getIndividuallyDeletedMessagesSet();
Method recoverMethod =
ManagedCursorImpl.class.getDeclaredMethod("recoverIndividualDeletedMessages",
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 5c10533e247..4c95454e33a 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3465,10 +3465,10 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
when(ml.getNextValidLedger(markDeleteLedgerId)).thenReturn(3L);
when(ml.getNextValidPosition(lastPosition)).thenReturn(nextPosition);
when(ml.ledgerExists(markDeleteLedgerId)).thenReturn(false);
+ when(ml.getConfig()).thenReturn(new ManagedLedgerConfig());
BookKeeper mockBookKeeper = mock(BookKeeper.class);
- final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper,
new ManagedLedgerConfig(), ml,
- cursorName);
+ final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper,
ml, cursorName);
cursor.recover(new VoidCallback() {
@Override
@@ -4772,8 +4772,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
// Reopen the ledger.
ledger = (ManagedLedgerImpl) factory.open(mlName, config);
BookKeeper mockBookKeeper = mock(BookKeeper.class);
- final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper,
new ManagedLedgerConfig(), ledger,
- cursorName);
+ final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper,
ledger, cursorName);
CompletableFuture<Void> recoverFuture = new CompletableFuture<>();
// Recover the cursor.
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index e983523c1b6..122bada487a 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3159,7 +3159,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
// (2) test read-timeout for: ManagedLedger.asyncReadEntry(..)
AtomicReference<ManagedLedgerException> responseException2 = new
AtomicReference<>();
PositionImpl readPositionRef = PositionImpl.EARLIEST;
- ManagedCursorImpl cursor = new ManagedCursorImpl(bk, config, ledger,
"cursor1");
+ ManagedCursorImpl cursor = new ManagedCursorImpl(bk, ledger,
"cursor1");
OpReadEntry opReadEntry = OpReadEntry.create(cursor, readPositionRef,
1, new ReadEntriesCallback() {
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 42b9358911a..82892ad353a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -210,10 +210,8 @@ public class BrokerBkEnsemblesTests extends
BkEnsemblesTestBase {
PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topic1).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.getCursors().iterator().next();
- Field configField = ManagedCursorImpl.class.getDeclaredField("config");
- configField.setAccessible(true);
// Create multiple data-ledger
- ManagedLedgerConfig config = (ManagedLedgerConfig)
configField.get(cursor);
+ ManagedLedgerConfig config = ml.getConfig();
config.setMaxEntriesPerLedger(entriesPerLedger);
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
// bookkeeper client
@@ -323,10 +321,8 @@ public class BrokerBkEnsemblesTests extends
BkEnsemblesTestBase {
PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topic1).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.getCursors().iterator().next();
- Field configField = ManagedCursorImpl.class.getDeclaredField("config");
- configField.setAccessible(true);
// Create multiple data-ledger
- ManagedLedgerConfig config = (ManagedLedgerConfig)
configField.get(cursor);
+ ManagedLedgerConfig config = ml.getConfig();
config.setMaxEntriesPerLedger(entriesPerLedger);
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
// bookkeeper client
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index d523586c2e2..5b750a0b9c2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -66,6 +66,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.PrometheusMetricsTestUtil;
@@ -754,6 +755,30 @@ public class PersistentTopicTest extends BrokerTestBase {
admin.topics().delete(topicName);
}
+ @Test
+ public void testCursorGetConfigAfterTopicPoliciesChanged() throws
Exception {
+ final String topicName = "persistent://prop/ns-abc/" +
UUID.randomUUID();
+ final String subName = "test_sub";
+
+ @Cleanup
+ Consumer<byte[]> subscribe =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ PersistentSubscription subscription =
persistentTopic.getSubscription(subName);
+
+ int maxConsumers = 100;
+ admin.topicPolicies().setMaxConsumers(topicName, 100);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topicPolicies().getMaxConsumers(topicName,
false), maxConsumers);
+ });
+
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
subscription.getCursor();
+ assertEquals(cursor.getConfig(),
persistentTopic.getManagedLedger().getConfig());
+
+ subscribe.close();
+ admin.topics().delete(topicName);
+ }
+
@Test
public void testAddWaitingCursorsForNonDurable() throws Exception {
final String ns = "prop/ns-test";