This is an automated email from the ASF dual-hosted git repository.
huangqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7404e0d77d9 [improve][broker]consumer backlog eviction policy should
not reset read position for consumer (#18037)
7404e0d77d9 is described below
commit 7404e0d77d98c2ec4fab9a111b8ced38d33dcd5a
Author: Qiang Huang <[email protected]>
AuthorDate: Wed Oct 19 17:11:22 2022 +0800
[improve][broker]consumer backlog eviction policy should not reset read
position for consumer (#18037)
### Motivation
Fixes #18036
### Modifications
- The backlog eviction policy should use `asyncMarkDelete` instead of
`resetCursor` in order to move the mark delete position.
---
.../pulsar/broker/service/BacklogQuotaManager.java | 16 +++++++++++-----
.../pulsar/broker/service/BacklogQuotaManagerTest.java | 9 +++++++--
2 files changed, 18 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 93ae777a89e..210c6f8767a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -210,22 +210,28 @@ public class BacklogQuotaManager {
Long currentMillis = ((ManagedLedgerImpl)
persistentTopic.getManagedLedger()).getClock().millis();
ManagedLedgerImpl mLedger = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
try {
- for (;;) {
+ for (; ; ) {
ManagedCursor slowestConsumer =
mLedger.getSlowestConsumer();
Position oldestPosition =
slowestConsumer.getMarkDeletedPosition();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] slowest consumer mark delete position
is [{}], read position is [{}]",
+ slowestConsumer.getName(), oldestPosition,
slowestConsumer.getReadPosition());
+ }
ManagedLedgerInfo.LedgerInfo ledgerInfo =
mLedger.getLedgerInfo(oldestPosition.getLedgerId()).get();
if (ledgerInfo == null) {
-
slowestConsumer.resetCursor(mLedger.getNextValidPosition((PositionImpl)
oldestPosition));
+ PositionImpl nextPosition =
+
PositionImpl.get(mLedger.getNextValidLedger(oldestPosition.getLedgerId()), -1);
+ slowestConsumer.markDelete(nextPosition);
continue;
}
// Timestamp only > 0 if ledger has been closed
if (ledgerInfo.getTimestamp() > 0
&& currentMillis - ledgerInfo.getTimestamp() >
quota.getLimitTime() * 1000) {
// skip whole ledger for the slowest cursor
- PositionImpl nextPosition =
mLedger.getNextValidPosition(
- PositionImpl.get(ledgerInfo.getLedgerId(),
ledgerInfo.getEntries() - 1));
+ PositionImpl nextPosition =
+
PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1);
if (!nextPosition.equals(oldestPosition)) {
- slowestConsumer.resetCursor(nextPosition);
+ slowestConsumer.markDelete(nextPosition);
continue;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 9eb6281eddc..97d1798c1e2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -529,18 +530,22 @@ public class BacklogQuotaManagerTest {
assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(),
14);
assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(),
14);
+ PersistentTopic topic1Reference = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic1).get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
topic1Reference.getManagedLedger();
+ Position slowConsumerReadPos =
ml.getSlowestConsumer().getReadPosition();
+
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000);
rolloverStats();
TopicStats stats2 = getTopicStats(topic1);
- PersistentTopic topic1Reference = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic1).get();
- ManagedLedgerImpl ml = (ManagedLedgerImpl)
topic1Reference.getManagedLedger();
// Messages on first 2 ledgers should be expired, backlog is number of
// message in current ledger.
Awaitility.await().untilAsserted(() -> {
assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(),
ml.getCurrentLedgerEntries());
assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(),
ml.getCurrentLedgerEntries());
});
+
+ assertEquals(ml.getSlowestConsumer().getReadPosition(),
slowConsumerReadPos);
client.close();
}