This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new ce89b53add4 Add filteredMsgCount for `pulsar-admin topics stats`
(#14531)
ce89b53add4 is described below
commit ce89b53add49818afb5fb310ecb8bed6164b9079
Author: Tao Jiuming <[email protected]>
AuthorDate: Tue Mar 15 11:41:40 2022 +0800
Add filteredMsgCount for `pulsar-admin topics stats` (#14531)
(cherry picked from commit d3ccd4ae2bfef4a401cec68daeee47e67f7a03d5)
---
.../broker/service/AbstractBaseDispatcher.java | 6 +++
.../pulsar/broker/service/AbstractTopic.java | 9 ++++
.../service/nonpersistent/NonPersistentTopic.java | 1 +
.../broker/service/persistent/PersistentTopic.java | 1 +
.../broker/service/plugin/FilterEntryTest.java | 52 ++++++++++++++++++++++
5 files changed, 69 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 4a550e48341..d14f683070b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -220,6 +220,12 @@ public abstract class AbstractBaseDispatcher implements
Dispatcher {
if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
subscription.acknowledgeMessage(entriesToFiltered,
AckType.Individual,
Collections.emptyMap());
+
+ int filtered = entriesToFiltered.size();
+ Topic topic = subscription.getTopic();
+ if (topic instanceof AbstractTopic) {
+ ((AbstractTopic) topic).addFilteredEntriesCount(filtered);
+ }
}
sendMessageInfo.setTotalMessages(totalMessages);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index f9e31bbf706..852185d4919 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -119,6 +119,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
private LongAdder bytesInCounter = new LongAdder();
private LongAdder msgInCounter = new LongAdder();
+ private final LongAdder filteredEntriesCounter = new LongAdder();
private static final AtomicLongFieldUpdater<AbstractTopic>
RATE_LIMITED_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class,
"publishRateLimitedTimes");
@@ -1121,4 +1122,12 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
}
+
+ public void addFilteredEntriesCount(int filtered) {
+ this.filteredEntriesCounter.add(filtered);
+ }
+
+ public long getFilteredEntriesCount() {
+ return this.filteredEntriesCounter.longValue();
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 3818b8abc2e..2ca106e7efb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -791,6 +791,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
topicStatsStream.writePair("bytesInCount", getBytesInCounter());
topicStatsStream.writePair("msgOutCount", getMsgOutCounter());
topicStatsStream.writePair("bytesOutCount", getBytesOutCounter());
+ topicStatsStream.writePair("filteredEntriesCount",
getFilteredEntriesCount());
nsStats.msgRateIn += topicStats.aggMsgRateIn;
nsStats.msgRateOut += topicStats.aggMsgRateOut;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5a970b48f58..a36176cdb45 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1813,6 +1813,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
topicStatsStream.writePair("storageSize", ledger.getTotalSize());
topicStatsStream.writePair("backlogSize",
ledger.getEstimatedBacklogSize());
topicStatsStream.writePair("pendingAddEntriesCount",
((ManagedLedgerImpl) ledger).getPendingAddEntriesCount());
+ topicStatsStream.writePair("filteredEntriesCount",
getFilteredEntriesCount());
nsStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
nsStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index daf773c8019..24b73ab8890 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -33,6 +33,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.AbstractBaseDispatcher;
+import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Dispatcher;
@@ -159,4 +160,55 @@ public class FilterEntryTest extends BrokerTestBase {
}
+
+ @Test
+ public void testFilteredMsgCount() throws Throwable {
+ String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+ String subName = "sub";
+
+ try (Producer<String> producer =
pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topic).create();
+ Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+ .subscriptionName(subName).subscribe()) {
+
+ // mock entry filters
+ PersistentSubscription subscription = (PersistentSubscription)
pulsar.getBrokerService()
+ .getTopicReference(topic).get().getSubscription(subName);
+ Dispatcher dispatcher = subscription.getDispatcher();
+ Field field =
AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+ field.setAccessible(true);
+ NarClassLoader narClassLoader = mock(NarClassLoader.class);
+ EntryFilter filter1 = new EntryFilterTest();
+ EntryFilterWithClassLoader loader1 =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1,
narClassLoader);
+ EntryFilter filter2 = new EntryFilter2Test();
+ EntryFilterWithClassLoader loader2 =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2,
narClassLoader);
+ field.set(dispatcher, ImmutableList.of(loader1, loader2));
+
+ for (int i = 0; i < 10; i++) {
+ producer.send("test");
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertNotNull(producer.newMessage().property("REJECT",
"").value("1").send());
+ }
+
+
+ int counter = 0;
+ while (true) {
+ Message<String> message = consumer.receive(10,
TimeUnit.SECONDS);
+ if (message != null) {
+ counter++;
+ assertEquals(message.getValue(), "test");
+ consumer.acknowledge(message);
+ } else {
+ break;
+ }
+ }
+
+ assertEquals(10, counter);
+ AbstractTopic abstractTopic = (AbstractTopic)
subscription.getTopic();
+ long filtered = abstractTopic.getFilteredEntriesCount();
+ assertEquals(filtered, 10);
+ }
+ }
}