This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b5ce6f0aa185d7276e38972371bd170a6b9b68cc Author: Oneby Wang <[email protected]> AuthorDate: Fri Jan 2 21:27:55 2026 +0800 [improve][admin] Add counter for marker messages in PersistentTopics.analyzeSubscriptionBacklog() rest api (#25091) (cherry picked from commit 75658cc353e74e132ef30090767fe062eae9ca05) --- .../broker/admin/impl/PersistentTopicsBase.java | 1 + .../broker/service/AnalyzeBacklogResult.java | 1 + .../service/persistent/PersistentSubscription.java | 6 +++ .../admin/AnalyzeBacklogSubscriptionTest.java | 1 + .../broker/admin/v3/AdminApiTransactionTest.java | 59 ++++++++++++++++++++++ .../broker/service/ReplicatedSubscriptionTest.java | 8 +++ .../broker/service/plugin/FilterEntryTest.java | 11 ++-- .../stats/AnalyzeSubscriptionBacklogResult.java | 1 + 8 files changed, 83 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4d28c5c76cf..5fa9bbb80f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1693,6 +1693,7 @@ public class PersistentTopicsBase extends AdminResource { result.setEntries(rawResult.getEntries()); result.setMessages(rawResult.getMessages()); + result.setMarkerMessages(rawResult.getMarkerMessages()); result.setFilterAcceptedEntries(rawResult.getFilterAcceptedEntries()); result.setFilterRejectedEntries(rawResult.getFilterRejectedEntries()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java index e227acf4e8f..b9c279f97ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java @@ -29,6 +29,7 @@ public final class AnalyzeBacklogResult { private long entries; private long messages; + private long markerMessages; private long filterRejectedEntries; private long filterAcceptedEntries; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 5eb06b87628..9e821925e28 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -622,6 +622,7 @@ public class PersistentSubscription extends AbstractSubscription { AtomicLong rejected = new AtomicLong(); AtomicLong rescheduled = new AtomicLong(); AtomicLong messages = new AtomicLong(); + AtomicLong markerMessages = new AtomicLong(); AtomicLong acceptedMessages = new AtomicLong(); AtomicLong rejectedMessages = new AtomicLong(); AtomicLong rescheduledMessages = new AtomicLong(); @@ -649,6 +650,10 @@ public class PersistentSubscription extends AbstractSubscription { lastPosition.set(entryPosition); ByteBuf metadataAndPayload = entry.getDataBuffer(); MessageMetadata messageMetadata = Commands.peekMessageMetadata(metadataAndPayload, "", -1); + if (messageMetadata.hasMarkerType()) { + markerMessages.incrementAndGet(); + return true; + } int numMessages = 1; if (messageMetadata.hasNumMessagesInBatch()) { numMessages = messageMetadata.getNumMessagesInBatch(); @@ -698,6 +703,7 @@ public class PersistentSubscription extends AbstractSubscription { result.setLastPosition(lastPosition.get()); result.setEntries(entries.get()); result.setMessages(messages.get()); + result.setMarkerMessages(markerMessages.get()); result.setFilterAcceptedEntries(accepted.get()); result.setFilterAcceptedMessages(acceptedMessages.get()); result.setFilterRejectedEntries(rejected.get()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java index dbc632c6cf3..acea9132049 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java @@ -161,6 +161,7 @@ public class AnalyzeBacklogSubscriptionTest extends ProducerConsumerBase { assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0); assertEquals(analyzeSubscriptionBacklogResult.getMessages(), numMessages); + assertEquals(analyzeSubscriptionBacklogResult.getMarkerMessages(), 0); assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedMessages(), numMessages); assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedMessages(), 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index b567fa21ef8..7323b57886b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -31,6 +31,7 @@ import static org.testng.Assert.fail; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -79,6 +80,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; import org.apache.pulsar.common.policies.data.TransactionMetadata; import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; +import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult; import org.apache.pulsar.common.stats.PositionInPendingAckStats; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; @@ -1054,6 +1056,63 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest { } } + @Test + public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws Exception { + initTransaction(1); + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/analyze-subscription-backlog"); + String transactionSubName = "analyze-subscription-backlog-topic-sub"; + + // Init subscription and then close the consumer. If consumer is connected and has available permits, + // AbstractBaseDispatcher#filterEntriesForConsumer will auto ack marker messages + pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(transactionSubName).subscribe().close(); + @Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + + int numMessages = 10; + List<MessageId> committedMsgIds = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Transaction txn = pulsarClient.newTransaction().build().get(); + MessageId messageId = producer.newMessage(txn).value("commited-msg" + i).send(); + committedMsgIds.add(messageId); + txn.commit().get(); + } + + AnalyzeSubscriptionBacklogResult backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty()); + assertEquals(backlogResult.getMessages(), numMessages); + assertEquals(backlogResult.getMarkerMessages(), numMessages); + + MessageId committedMiddleMsgId = committedMsgIds.get(numMessages / 2); + backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.of(committedMiddleMsgId)); + assertEquals(backlogResult.getMessages(), numMessages / 2); + assertEquals(backlogResult.getMarkerMessages(), numMessages / 2); + + List<MessageId> abortedMsgIds = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Transaction txn = pulsarClient.newTransaction().build().get(); + MessageId messageId = producer.newMessage(txn).value("aborted-msg" + i).send(); + abortedMsgIds.add(messageId); + txn.abort(); + } + backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty()); + assertEquals(backlogResult.getMessages(), numMessages * 2); + assertEquals(backlogResult.getMarkerMessages(), numMessages * 2); + + MessageId abortedMiddleMsgId = abortedMsgIds.get(numMessages / 2); + backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.of(abortedMiddleMsgId)); + assertEquals(backlogResult.getMessages(), numMessages / 2); + assertEquals(backlogResult.getMarkerMessages(), numMessages / 2); + + Transaction txn = pulsarClient.newTransaction().build().get(); + for (int i = 0; i < numMessages; i++) { + producer.newMessage(txn).value("uncommitted-msg-" + i).send(); + } + backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty()); + assertEquals(backlogResult.getMessages(), numMessages * 3); + assertEquals(backlogResult.getMarkerMessages(), numMessages * 2); + } + private static void verifyCoordinatorStats(String state, long sequenceId, long lowWaterMark) { assertEquals(state, "Ready"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java index 74dbc5e5291..c538207584d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java @@ -39,6 +39,7 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.Cleanup; @@ -70,6 +71,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1090,6 +1092,12 @@ public class ReplicatedSubscriptionTest extends ReplicatorTestBase { } Assert.assertEquals(numSnapshotRequest, 1); + // Assert analyze backlog total messages and marker messages. + AnalyzeSubscriptionBacklogResult backlogResult = + admin4.topics().analyzeSubscriptionBacklog(topicName, subscriptionName, Optional.empty()); + assertEquals(backlogResult.getMessages(), numMessages); + assertEquals(backlogResult.getMarkerMessages(), numSnapshotRequest); + // Wait pending snapshot timeout Thread.sleep(config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds() * 1000); numSnapshotRequest = 0; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index 89b409ae581..a70c2f3cf8d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java @@ -253,7 +253,7 @@ public class FilterEntryTest extends BrokerTestBase { producer.send("test"); } - verifyBacklog(topic, subName, 10, 10, 10, 10, 0, 0, 0, 0); + verifyBacklog(topic, subName, 10, 10, 0, 10, 10, 0, 0, 0, 0); int counter = 0; while (true) { @@ -268,7 +268,7 @@ public class FilterEntryTest extends BrokerTestBase { // All normal messages can be received assertEquals(10, counter); - verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0); + verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0, 0); // stop the consumer consumer.close(); @@ -280,7 +280,7 @@ public class FilterEntryTest extends BrokerTestBase { // analyze the subscription and predict that // 10 messages will be rejected by the filter - verifyBacklog(topic, subName, 10, 10, 0, 0, 10, 10, 0, 0); + verifyBacklog(topic, subName, 10, 10, 0, 0, 0, 10, 10, 0, 0); consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) @@ -304,7 +304,7 @@ public class FilterEntryTest extends BrokerTestBase { // now the Filter acknoledged the messages on behalf of the Consumer // backlog is now zero again - verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0); + verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0, 0); // All messages should be acked, check the MarkDeletedPosition assertNotNull(lastMsgId); @@ -545,7 +545,7 @@ public class FilterEntryTest extends BrokerTestBase { private void verifyBacklog(String topic, String subscription, - int numEntries, int numMessages, + int numEntries, int numMessages, int numMarkerMessages, int numEntriesAccepted, int numMessagesAccepted, int numEntriesRejected, int numMessagesRejected, int numEntriesRescheduled, int numMessagesRescheduled @@ -559,6 +559,7 @@ public class FilterEntryTest extends BrokerTestBase { Assert.assertEquals(numEntriesRescheduled, a1.getFilterRescheduledEntries()); Assert.assertEquals(numMessages, a1.getMessages()); + Assert.assertEquals(numMarkerMessages, a1.getMarkerMessages()); Assert.assertEquals(numMessagesAccepted, a1.getFilterAcceptedMessages()); Assert.assertEquals(numMessagesRejected, a1.getFilterRejectedMessages()); Assert.assertEquals(numMessagesRescheduled, a1.getFilterRescheduledMessages()); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java index 059026b80c5..622d91e4e98 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java @@ -26,6 +26,7 @@ import lombok.ToString; public class AnalyzeSubscriptionBacklogResult { private long entries; private long messages; + private long markerMessages; private long filterRejectedEntries; private long filterAcceptedEntries;
