This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new ce89b53add4 Add filteredMsgCount for `pulsar-admin topics stats` 
(#14531)
ce89b53add4 is described below

commit ce89b53add49818afb5fb310ecb8bed6164b9079
Author: Tao Jiuming <[email protected]>
AuthorDate: Tue Mar 15 11:41:40 2022 +0800

    Add filteredMsgCount for `pulsar-admin topics stats` (#14531)
    
    (cherry picked from commit d3ccd4ae2bfef4a401cec68daeee47e67f7a03d5)
---
 .../broker/service/AbstractBaseDispatcher.java     |  6 +++
 .../pulsar/broker/service/AbstractTopic.java       |  9 ++++
 .../service/nonpersistent/NonPersistentTopic.java  |  1 +
 .../broker/service/persistent/PersistentTopic.java |  1 +
 .../broker/service/plugin/FilterEntryTest.java     | 52 ++++++++++++++++++++++
 5 files changed, 69 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 4a550e48341..d14f683070b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -220,6 +220,12 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
         if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
             subscription.acknowledgeMessage(entriesToFiltered, 
AckType.Individual,
                     Collections.emptyMap());
+
+            int filtered = entriesToFiltered.size();
+            Topic topic = subscription.getTopic();
+            if (topic instanceof AbstractTopic) {
+                ((AbstractTopic) topic).addFilteredEntriesCount(filtered);
+            }
         }
 
         sendMessageInfo.setTotalMessages(totalMessages);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index f9e31bbf706..852185d4919 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -119,6 +119,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     private LongAdder bytesInCounter = new LongAdder();
     private LongAdder msgInCounter = new LongAdder();
+    private final LongAdder filteredEntriesCounter = new LongAdder();
 
     private static final AtomicLongFieldUpdater<AbstractTopic> 
RATE_LIMITED_UPDATER =
             AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, 
"publishRateLimitedTimes");
@@ -1121,4 +1122,12 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
             
subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
     }
+
+    public void addFilteredEntriesCount(int filtered) {
+        this.filteredEntriesCounter.add(filtered);
+    }
+
+    public long getFilteredEntriesCount() {
+        return this.filteredEntriesCounter.longValue();
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 3818b8abc2e..2ca106e7efb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -791,6 +791,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
         topicStatsStream.writePair("bytesInCount", getBytesInCounter());
         topicStatsStream.writePair("msgOutCount", getMsgOutCounter());
         topicStatsStream.writePair("bytesOutCount", getBytesOutCounter());
+        topicStatsStream.writePair("filteredEntriesCount", 
getFilteredEntriesCount());
 
         nsStats.msgRateIn += topicStats.aggMsgRateIn;
         nsStats.msgRateOut += topicStats.aggMsgRateOut;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5a970b48f58..a36176cdb45 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1813,6 +1813,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         topicStatsStream.writePair("storageSize", ledger.getTotalSize());
         topicStatsStream.writePair("backlogSize", 
ledger.getEstimatedBacklogSize());
         topicStatsStream.writePair("pendingAddEntriesCount", 
((ManagedLedgerImpl) ledger).getPendingAddEntriesCount());
+        topicStatsStream.writePair("filteredEntriesCount", 
getFilteredEntriesCount());
 
         nsStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
         nsStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index daf773c8019..24b73ab8890 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -33,6 +33,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.AbstractBaseDispatcher;
+import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Dispatcher;
@@ -159,4 +160,55 @@ public class FilterEntryTest extends BrokerTestBase {
 
     }
 
+
+    @Test
+    public void testFilteredMsgCount() throws Throwable {
+        String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+        String subName = "sub";
+
+        try (Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topic).create();
+             Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                     .subscriptionName(subName).subscribe()) {
+
+            // mock entry filters
+            PersistentSubscription subscription = (PersistentSubscription) 
pulsar.getBrokerService()
+                    .getTopicReference(topic).get().getSubscription(subName);
+            Dispatcher dispatcher = subscription.getDispatcher();
+            Field field = 
AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+            field.setAccessible(true);
+            NarClassLoader narClassLoader = mock(NarClassLoader.class);
+            EntryFilter filter1 = new EntryFilterTest();
+            EntryFilterWithClassLoader loader1 = 
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, 
narClassLoader);
+            EntryFilter filter2 = new EntryFilter2Test();
+            EntryFilterWithClassLoader loader2 = 
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, 
narClassLoader);
+            field.set(dispatcher, ImmutableList.of(loader1, loader2));
+
+            for (int i = 0; i < 10; i++) {
+                producer.send("test");
+            }
+
+            for (int i = 0; i < 10; i++) {
+                assertNotNull(producer.newMessage().property("REJECT", 
"").value("1").send());
+            }
+
+
+            int counter = 0;
+            while (true) {
+                Message<String> message = consumer.receive(10, 
TimeUnit.SECONDS);
+                if (message != null) {
+                    counter++;
+                    assertEquals(message.getValue(), "test");
+                    consumer.acknowledge(message);
+                } else {
+                    break;
+                }
+            }
+
+            assertEquals(10, counter);
+            AbstractTopic abstractTopic = (AbstractTopic) 
subscription.getTopic();
+            long filtered = abstractTopic.getFilteredEntriesCount();
+            assertEquals(filtered, 10);
+        }
+    }
 }

Reply via email to