This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 54da0c8cef5 [fix][broker] BacklogMessageAge is not reset when cursor
mdPosition is on an open ledger (#24915)
54da0c8cef5 is described below
commit 54da0c8cef54480b09272db34e770894ff0ef474
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Oct 31 18:29:02 2025 +0800
[fix][broker] BacklogMessageAge is not reset when cursor mdPosition is on
an open ledger (#24915)
---
.../broker/service/persistent/PersistentTopic.java | 2 +
.../broker/service/BacklogQuotaManagerTest.java | 55 +++++++++++++++++-----
2 files changed, 44 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4b28171ea99..1f424df17a1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3877,6 +3877,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
oldestMarkDeleteCursorInfo.getCursor().getName(),
checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp(),
oldestMarkDeleteCursorInfo.getVersion()));
+ } else {
+ TIME_BASED_BACKLOG_QUOTA_CHECK_RESULT_UPDATER.set(this,
null);
}
return CompletableFuture.completedFuture(null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 3c8dcfdc12f..19383d50f2e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -46,7 +46,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -151,6 +153,7 @@ public class BacklogQuotaManagerTest {
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+ config.setManagedLedgerDefaultMarkDeleteRateLimit(1000);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setSystemTopicEnabled(true);
config.setTopicLevelPoliciesEnabled(true);
@@ -704,7 +707,7 @@ public class BacklogQuotaManagerTest {
final String topic1 = "persistent://prop/ns-quota/topic2" +
UUID.randomUUID();
final String subName1 = "c1";
- final int numMsgs = 5;
+ final int numMsgs = 7;
Consumer<byte[]> consumer1 =
client.newConsumer().topic(topic1).subscriptionName(subName1)
.acknowledgmentGroupTime(0, SECONDS)
@@ -712,27 +715,35 @@ public class BacklogQuotaManagerTest {
Producer<byte[]> producer = createProducer(client, topic1);
byte[] content = new byte[1024];
+ // 1. Send messages
+ // The manager ledger max entries is 5, so we can send 7 messages
to make sure we have multiple ledgers
+ // When send msg 4, the ledger closed.
+ // Second: 1 2 3 4 5 6 7
+ // msg idx: [0 1 2 3 4] [5 6]
for (int i = 0; i < numMsgs; i++) {
- Thread.sleep(3000); // Guarantees if we use wrong message in
age, to show up in failed test
- producer.send(content);
+ Thread.sleep(1000);
+ MessageId send = producer.send(content);
}
+ long lastLedgerCloseTime = System.currentTimeMillis() - 2000;
+ // 2. Receive msg-0 and ack it.
+ String c1MarkDeletePositionBefore =
+
admin.topics().getInternalStats(topic1).cursors.get(subName1).markDeletePosition;
Message<byte[]> oldestMessage = consumer1.receive();
consumer1.acknowledge(oldestMessage);
- log.info("Moved subscription 1, by 1 message");
-
- // Unload topic to trigger the ledger close
- unloadAndLoadTopic(topic1, producer);
- long unloadTime = System.currentTimeMillis();
- waitForQuotaCheckToRunTwice();
+ c1MarkDeletePositionBefore =
waitForMarkDeletePositionToChange(topic1, subName1,
+ c1MarkDeletePositionBefore);
+ log.info("Moved subscription 1, by 1 message {}",
oldestMessage.getMessageId());
- Metrics metrics = prometheusMetricsClient.getMetrics();
+ // 3. Expected the oldestBacklogMessageAgeSeconds is based on last
ledger close time
+ long expectedMessageAgeSeconds =
+ MILLISECONDS.toSeconds(System.currentTimeMillis() -
lastLedgerCloseTime);
+ PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic1).get();
+ topicRef.updateOldPositionInfo();
TopicStats topicStats = getTopicStats(topic1);
-
- long expectedMessageAgeSeconds =
MILLISECONDS.toSeconds(System.currentTimeMillis() - unloadTime);
assertThat(topicStats.getOldestBacklogMessageAgeSeconds())
.isCloseTo(expectedMessageAgeSeconds, within(1L));
-
+ Metrics metrics = prometheusMetricsClient.getMetrics();
Metric backlogAgeMetric =
metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
Pair.of("topic", topic1));
@@ -741,6 +752,24 @@ public class BacklogQuotaManagerTest {
entry("namespace", namespace),
entry("topic", topic1));
assertThat((long)
backlogAgeMetric.value).isCloseTo(expectedMessageAgeSeconds, within(2L));
+
+ // 4. Move consumer to `end - 1`, then
OldestBacklogMessageAgeSeconds should be `-1`, because the
+ // second ledger is not closed yet.
+ for (int i = 1; i < numMsgs - 1; i++) {
+ Message<byte[]> msg = consumer1.receive();
+ consumer1.acknowledge(msg);
+ }
+ waitForMarkDeletePositionToChange(topic1, subName1,
c1MarkDeletePositionBefore);
+ ManagedCursorContainer cursors = (ManagedCursorContainer)
topicRef.getManagedLedger().getCursors();
+ ManagedCursor subCursor = cursors.get(subName1);
+ Awaitility.await().pollInterval(100, MILLISECONDS).atMost(5,
SECONDS).until(
+ () ->
subCursor.getMarkDeletedPosition().equals(subCursor.getPersistentMarkDeletedPosition()));
+ topicRef.updateOldPositionInfo();
+ topicStats = getTopicStats(topic1, true);
+
assertThat(topicStats.getSubscriptions().get(subName1).getMsgBacklog())
+ .isEqualTo(1L);
+ assertThat(topicStats.getOldestBacklogMessageAgeSeconds())
+ .isEqualTo(-1L);
}
}