This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b5ce6f0aa185d7276e38972371bd170a6b9b68cc
Author: Oneby Wang <[email protected]>
AuthorDate: Fri Jan 2 21:27:55 2026 +0800

    [improve][admin] Add counter for marker messages in 
PersistentTopics.analyzeSubscriptionBacklog() rest api (#25091)
    
    (cherry picked from commit 75658cc353e74e132ef30090767fe062eae9ca05)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  1 +
 .../broker/service/AnalyzeBacklogResult.java       |  1 +
 .../service/persistent/PersistentSubscription.java |  6 +++
 .../admin/AnalyzeBacklogSubscriptionTest.java      |  1 +
 .../broker/admin/v3/AdminApiTransactionTest.java   | 59 ++++++++++++++++++++++
 .../broker/service/ReplicatedSubscriptionTest.java |  8 +++
 .../broker/service/plugin/FilterEntryTest.java     | 11 ++--
 .../stats/AnalyzeSubscriptionBacklogResult.java    |  1 +
 8 files changed, 83 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4d28c5c76cf..5fa9bbb80f2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1693,6 +1693,7 @@ public class PersistentTopicsBase extends AdminResource {
 
                         result.setEntries(rawResult.getEntries());
                         result.setMessages(rawResult.getMessages());
+                        
result.setMarkerMessages(rawResult.getMarkerMessages());
 
                         
result.setFilterAcceptedEntries(rawResult.getFilterAcceptedEntries());
                         
result.setFilterRejectedEntries(rawResult.getFilterRejectedEntries());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java
index e227acf4e8f..b9c279f97ce 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java
@@ -29,6 +29,7 @@ public final class AnalyzeBacklogResult {
 
     private long entries;
     private long messages;
+    private long markerMessages;
 
     private long filterRejectedEntries;
     private long filterAcceptedEntries;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 5eb06b87628..9e821925e28 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -622,6 +622,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
         AtomicLong rejected = new AtomicLong();
         AtomicLong rescheduled = new AtomicLong();
         AtomicLong messages = new AtomicLong();
+        AtomicLong markerMessages = new AtomicLong();
         AtomicLong acceptedMessages = new AtomicLong();
         AtomicLong rejectedMessages = new AtomicLong();
         AtomicLong rescheduledMessages = new AtomicLong();
@@ -649,6 +650,10 @@ public class PersistentSubscription extends 
AbstractSubscription {
             lastPosition.set(entryPosition);
             ByteBuf metadataAndPayload = entry.getDataBuffer();
             MessageMetadata messageMetadata = 
Commands.peekMessageMetadata(metadataAndPayload, "", -1);
+            if (messageMetadata.hasMarkerType()) {
+                markerMessages.incrementAndGet();
+                return true;
+            }
             int numMessages = 1;
             if (messageMetadata.hasNumMessagesInBatch()) {
                 numMessages = messageMetadata.getNumMessagesInBatch();
@@ -698,6 +703,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
             result.setLastPosition(lastPosition.get());
             result.setEntries(entries.get());
             result.setMessages(messages.get());
+            result.setMarkerMessages(markerMessages.get());
             result.setFilterAcceptedEntries(accepted.get());
             result.setFilterAcceptedMessages(acceptedMessages.get());
             result.setFilterRejectedEntries(rejected.get());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
index dbc632c6cf3..acea9132049 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -161,6 +161,7 @@ public class AnalyzeBacklogSubscriptionTest extends 
ProducerConsumerBase {
         
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0);
 
         assertEquals(analyzeSubscriptionBacklogResult.getMessages(), 
numMessages);
+        assertEquals(analyzeSubscriptionBacklogResult.getMarkerMessages(), 0);
         
assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedMessages(), 
numMessages);
         
assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedMessages(), 0);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index b567fa21ef8..7323b57886b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -31,6 +31,7 @@ import static org.testng.Assert.fail;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -79,6 +80,7 @@ import 
org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionMetadata;
 import 
org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
 import org.apache.pulsar.common.stats.PositionInPendingAckStats;
 import org.apache.pulsar.common.util.FutureUtil;
 import 
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
@@ -1054,6 +1056,63 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws 
Exception {
+        initTransaction(1);
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/analyze-subscription-backlog");
+        String transactionSubName = "analyze-subscription-backlog-topic-sub";
+
+        // Init subscription and then close the consumer. If consumer is 
connected and has available permits,
+        // AbstractBaseDispatcher#filterEntriesForConsumer will auto ack 
marker messages
+        
pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(transactionSubName).subscribe().close();
+        @Cleanup Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+        int numMessages = 10;
+        List<MessageId> committedMsgIds = new ArrayList<>();
+        for (int i = 0; i < numMessages; i++) {
+            Transaction txn = pulsarClient.newTransaction().build().get();
+            MessageId messageId = 
producer.newMessage(txn).value("commited-msg" + i).send();
+            committedMsgIds.add(messageId);
+            txn.commit().get();
+        }
+
+        AnalyzeSubscriptionBacklogResult backlogResult =
+                admin.topics().analyzeSubscriptionBacklog(topic, 
transactionSubName, Optional.empty());
+        assertEquals(backlogResult.getMessages(), numMessages);
+        assertEquals(backlogResult.getMarkerMessages(), numMessages);
+
+        MessageId committedMiddleMsgId = committedMsgIds.get(numMessages / 2);
+        backlogResult =
+                admin.topics().analyzeSubscriptionBacklog(topic, 
transactionSubName, Optional.of(committedMiddleMsgId));
+        assertEquals(backlogResult.getMessages(), numMessages / 2);
+        assertEquals(backlogResult.getMarkerMessages(), numMessages / 2);
+
+        List<MessageId> abortedMsgIds = new ArrayList<>();
+        for (int i = 0; i < numMessages; i++) {
+            Transaction txn = pulsarClient.newTransaction().build().get();
+            MessageId messageId = producer.newMessage(txn).value("aborted-msg" 
+ i).send();
+            abortedMsgIds.add(messageId);
+            txn.abort();
+        }
+        backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, 
transactionSubName, Optional.empty());
+        assertEquals(backlogResult.getMessages(), numMessages * 2);
+        assertEquals(backlogResult.getMarkerMessages(), numMessages * 2);
+
+        MessageId abortedMiddleMsgId = abortedMsgIds.get(numMessages / 2);
+        backlogResult =
+                admin.topics().analyzeSubscriptionBacklog(topic, 
transactionSubName, Optional.of(abortedMiddleMsgId));
+        assertEquals(backlogResult.getMessages(), numMessages / 2);
+        assertEquals(backlogResult.getMarkerMessages(), numMessages / 2);
+
+        Transaction txn = pulsarClient.newTransaction().build().get();
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage(txn).value("uncommitted-msg-" + i).send();
+        }
+        backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, 
transactionSubName, Optional.empty());
+        assertEquals(backlogResult.getMessages(), numMessages * 3);
+        assertEquals(backlogResult.getMarkerMessages(), numMessages * 2);
+    }
+
     private static void verifyCoordinatorStats(String state,
                                                long sequenceId, long 
lowWaterMark) {
         assertEquals(state, "Ready");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
index 74dbc5e5291..c538207584d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
@@ -39,6 +39,7 @@ import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
@@ -70,6 +71,7 @@ import 
org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1090,6 +1092,12 @@ public class ReplicatedSubscriptionTest extends 
ReplicatorTestBase {
         }
         Assert.assertEquals(numSnapshotRequest, 1);
 
+        // Assert analyze backlog total messages and marker messages.
+        AnalyzeSubscriptionBacklogResult backlogResult =
+                admin4.topics().analyzeSubscriptionBacklog(topicName, 
subscriptionName, Optional.empty());
+        assertEquals(backlogResult.getMessages(), numMessages);
+        assertEquals(backlogResult.getMarkerMessages(), numSnapshotRequest);
+
         // Wait pending snapshot timeout
         
Thread.sleep(config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds() * 1000);
         numSnapshotRequest = 0;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index 89b409ae581..a70c2f3cf8d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -253,7 +253,7 @@ public class FilterEntryTest extends BrokerTestBase {
             producer.send("test");
         }
 
-        verifyBacklog(topic, subName, 10, 10, 10, 10, 0, 0, 0, 0);
+        verifyBacklog(topic, subName, 10, 10, 0, 10, 10, 0, 0, 0, 0);
 
         int counter = 0;
         while (true) {
@@ -268,7 +268,7 @@ public class FilterEntryTest extends BrokerTestBase {
         // All normal messages can be received
         assertEquals(10, counter);
 
-        verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0);
+        verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0, 0);
 
         // stop the consumer
         consumer.close();
@@ -280,7 +280,7 @@ public class FilterEntryTest extends BrokerTestBase {
 
         // analyze the subscription and predict that
         // 10 messages will be rejected by the filter
-        verifyBacklog(topic, subName, 10, 10, 0, 0, 10, 10, 0, 0);
+        verifyBacklog(topic, subName, 10, 10, 0, 0, 0, 10, 10, 0, 0);
 
         consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
@@ -304,7 +304,7 @@ public class FilterEntryTest extends BrokerTestBase {
 
         // now the Filter acknoledged the messages on behalf of the Consumer
         // backlog is now zero again
-        verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0);
+        verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0, 0);
 
         // All messages should be acked, check the MarkDeletedPosition
         assertNotNull(lastMsgId);
@@ -545,7 +545,7 @@ public class FilterEntryTest extends BrokerTestBase {
 
 
     private void verifyBacklog(String topic, String subscription,
-                               int numEntries, int numMessages,
+                               int numEntries, int numMessages, int 
numMarkerMessages,
                                int numEntriesAccepted, int numMessagesAccepted,
                                int numEntriesRejected, int numMessagesRejected,
                                int numEntriesRescheduled, int 
numMessagesRescheduled
@@ -559,6 +559,7 @@ public class FilterEntryTest extends BrokerTestBase {
         Assert.assertEquals(numEntriesRescheduled, 
a1.getFilterRescheduledEntries());
 
         Assert.assertEquals(numMessages, a1.getMessages());
+        Assert.assertEquals(numMarkerMessages, a1.getMarkerMessages());
         Assert.assertEquals(numMessagesAccepted, 
a1.getFilterAcceptedMessages());
         Assert.assertEquals(numMessagesRejected, 
a1.getFilterRejectedMessages());
         Assert.assertEquals(numMessagesRescheduled, 
a1.getFilterRescheduledMessages());
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java
index 059026b80c5..622d91e4e98 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java
@@ -26,6 +26,7 @@ import lombok.ToString;
 public class AnalyzeSubscriptionBacklogResult {
     private long entries;
     private long messages;
+    private long markerMessages;
 
     private long filterRejectedEntries;
     private long filterAcceptedEntries;

Reply via email to