This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 07aa1aed92bc87fa5d7e2e9a41df77231bb50b11
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Oct 31 18:29:02 2025 +0800

    [fix][broker] BacklogMessageAge is not reset when cursor mdPosition is on 
an open ledger (#24915)
    
    (cherry picked from commit 54da0c8cef54480b09272db34e770894ff0ef474)
---
 .../broker/service/persistent/PersistentTopic.java |  2 +
 .../broker/service/BacklogQuotaManagerTest.java    | 55 +++++++++++++++++-----
 2 files changed, 44 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4b28171ea99..1f424df17a1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3877,6 +3877,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                     
oldestMarkDeleteCursorInfo.getCursor().getName(),
                                     
checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp(),
                                     oldestMarkDeleteCursorInfo.getVersion()));
+                } else {
+                    TIME_BASED_BACKLOG_QUOTA_CHECK_RESULT_UPDATER.set(this, 
null);
                 }
 
                 return CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 3c8dcfdc12f..19383d50f2e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -46,7 +46,9 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.BrokerTestUtil;
@@ -151,6 +153,7 @@ public class BacklogQuotaManagerTest {
             
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
             config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
             config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+            config.setManagedLedgerDefaultMarkDeleteRateLimit(1000);
             config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
             config.setSystemTopicEnabled(true);
             config.setTopicLevelPoliciesEnabled(true);
@@ -704,7 +707,7 @@ public class BacklogQuotaManagerTest {
             final String topic1 = "persistent://prop/ns-quota/topic2" + 
UUID.randomUUID();
 
             final String subName1 = "c1";
-            final int numMsgs = 5;
+            final int numMsgs = 7;
 
             Consumer<byte[]> consumer1 = 
client.newConsumer().topic(topic1).subscriptionName(subName1)
                     .acknowledgmentGroupTime(0, SECONDS)
@@ -712,27 +715,35 @@ public class BacklogQuotaManagerTest {
             Producer<byte[]> producer = createProducer(client, topic1);
 
             byte[] content = new byte[1024];
+            // 1. Send messages
+            // The manager ledger max entries is 5, so we can send 7 messages 
to make sure we have multiple ledgers
+            // When send msg 4, the ledger closed.
+            // Second:     1  2  3  4  5     6   7
+            // msg idx:   [0  1  2  3  4]   [5   6]
             for (int i = 0; i < numMsgs; i++) {
-                Thread.sleep(3000); // Guarantees if we use wrong message in 
age, to show up in failed test
-                producer.send(content);
+                Thread.sleep(1000);
+                MessageId send = producer.send(content);
             }
+            long lastLedgerCloseTime = System.currentTimeMillis() - 2000;
 
+            // 2. Receive msg-0 and ack it.
+            String c1MarkDeletePositionBefore =
+                    
admin.topics().getInternalStats(topic1).cursors.get(subName1).markDeletePosition;
             Message<byte[]> oldestMessage = consumer1.receive();
             consumer1.acknowledge(oldestMessage);
-            log.info("Moved subscription 1, by 1 message");
-
-            // Unload topic to trigger the ledger close
-            unloadAndLoadTopic(topic1, producer);
-            long unloadTime = System.currentTimeMillis();
-            waitForQuotaCheckToRunTwice();
+            c1MarkDeletePositionBefore = 
waitForMarkDeletePositionToChange(topic1, subName1,
+                    c1MarkDeletePositionBefore);
+            log.info("Moved subscription 1, by 1 message {}", 
oldestMessage.getMessageId());
 
-            Metrics metrics = prometheusMetricsClient.getMetrics();
+            // 3. Expected the oldestBacklogMessageAgeSeconds is based on last 
ledger close time
+            long expectedMessageAgeSeconds =
+                    MILLISECONDS.toSeconds(System.currentTimeMillis() - 
lastLedgerCloseTime);
+            PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic1).get();
+            topicRef.updateOldPositionInfo();
             TopicStats topicStats = getTopicStats(topic1);
-
-            long expectedMessageAgeSeconds = 
MILLISECONDS.toSeconds(System.currentTimeMillis() - unloadTime);
             assertThat(topicStats.getOldestBacklogMessageAgeSeconds())
                     .isCloseTo(expectedMessageAgeSeconds, within(1L));
-
+            Metrics metrics = prometheusMetricsClient.getMetrics();
             Metric backlogAgeMetric =
                     
metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
                             Pair.of("topic", topic1));
@@ -741,6 +752,24 @@ public class BacklogQuotaManagerTest {
                     entry("namespace", namespace),
                     entry("topic", topic1));
             assertThat((long) 
backlogAgeMetric.value).isCloseTo(expectedMessageAgeSeconds, within(2L));
+
+            // 4. Move consumer to `end - 1`, then 
OldestBacklogMessageAgeSeconds should be `-1`, because the
+            // second ledger is not closed yet.
+            for (int i = 1; i < numMsgs - 1; i++) {
+                Message<byte[]> msg = consumer1.receive();
+                consumer1.acknowledge(msg);
+            }
+            waitForMarkDeletePositionToChange(topic1, subName1, 
c1MarkDeletePositionBefore);
+            ManagedCursorContainer cursors = (ManagedCursorContainer) 
topicRef.getManagedLedger().getCursors();
+            ManagedCursor subCursor = cursors.get(subName1);
+            Awaitility.await().pollInterval(100, MILLISECONDS).atMost(5, 
SECONDS).until(
+                    () -> 
subCursor.getMarkDeletedPosition().equals(subCursor.getPersistentMarkDeletedPosition()));
+            topicRef.updateOldPositionInfo();
+            topicStats = getTopicStats(topic1, true);
+            
assertThat(topicStats.getSubscriptions().get(subName1).getMsgBacklog())
+                    .isEqualTo(1L);
+            assertThat(topicStats.getOldestBacklogMessageAgeSeconds())
+                    .isEqualTo(-1L);
         }
     }
 

Reply via email to