This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new f7320d0436a [fix][broker] Fix entry filter feature for the
non-persistent topic (#20141)
f7320d0436a is described below
commit f7320d0436af0fa13fc817b1fb9d2e60dba7515b
Author: ran <[email protected]>
AuthorDate: Thu Apr 20 16:18:06 2023 +0800
[fix][broker] Fix entry filter feature for the non-persistent topic (#20141)
---
.../service/nonpersistent/NonPersistentTopic.java | 5 +--
.../broker/service/plugin/FilterEntryTest.java | 15 +++++---
.../pulsar/broker/stats/SubscriptionStatsTest.java | 42 +++++++++++-----------
3 files changed, 36 insertions(+), 26 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index b178e99b5c8..958f198dccd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -28,7 +28,7 @@ import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -188,7 +188,8 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
// entry internally retains data so, duplicateBuffer should be
release here
duplicateBuffer.release();
if (subscription.getDispatcher() != null) {
-
subscription.getDispatcher().sendMessages(Collections.singletonList(entry));
+ // Dispatcher needs to call the set method to support entry
filter feature.
+
subscription.getDispatcher().sendMessages(Arrays.asList(entry));
} else {
// it happens when subscription is created but dispatcher is
not created as consumer is not added
// yet
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index ee100f3fdee..53a62a0e556 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryFilterSupport;
+import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -194,10 +195,16 @@ public class FilterEntryTest extends BrokerTestBase {
}
+ @DataProvider(name = "topicProvider")
+ public Object[][] topicProvider() {
+ return new Object[][]{
+ {"persistent://prop/ns-abc/topic" + UUID.randomUUID()},
+ {"non-persistent://prop/ns-abc/topic" + UUID.randomUUID()},
+ };
+ }
- @Test
- public void testFilteredMsgCount() throws Throwable {
- String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+ @Test(dataProvider = "topicProvider")
+ public void testFilteredMsgCount(String topic) throws Throwable {
String subName = "sub";
try (Producer<String> producer =
pulsarClient.newProducer(Schema.STRING)
@@ -206,7 +213,7 @@ public class FilterEntryTest extends BrokerTestBase {
.subscriptionName(subName).subscribe()) {
// mock entry filters
- PersistentSubscription subscription = (PersistentSubscription)
pulsar.getBrokerService()
+ Subscription subscription = pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
Dispatcher dispatcher = subscription.getDispatcher();
Field field =
EntryFilterSupport.class.getDeclaredField("entryFilters");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index b1b865727a0..64d0acf6cc0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -190,21 +190,22 @@ public class SubscriptionStatsTest extends
ProducerConsumerBase {
field.set(dispatcher, ImmutableList.of(loader1));
}
- for (int i = 0; i < 100; i++) {
- producer.newMessage().property("ACCEPT", "
").value(UUID.randomUUID().toString()).send();
- }
- for (int i = 0; i < 100; i++) {
+ int rejectedCount = 100;
+ int acceptCount = 100;
+ int scheduleCount = 100;
+ for (int i = 0; i < rejectedCount; i++) {
producer.newMessage().property("REJECT", "
").value(UUID.randomUUID().toString()).send();
}
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < acceptCount; i++) {
+ producer.newMessage().property("ACCEPT", "
").value(UUID.randomUUID().toString()).send();
+ }
+ for (int i = 0; i < scheduleCount; i++) {
producer.newMessage().property("RESCHEDULE", "
").value(UUID.randomUUID().toString()).send();
}
- for (;;) {
- Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
- if (message == null) {
- break;
- }
+ for (int i = 0; i < acceptCount; i++) {
+ Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
consumer.acknowledge(message);
}
@@ -242,12 +243,12 @@ public class SubscriptionStatsTest extends
ProducerConsumerBase {
.mapToDouble(m-> m.value).sum();
if (setFilter) {
- Assert.assertEquals(filterAccepted, 100);
- if (isPersistent) {
- Assert.assertEquals(filterRejected, 100);
- // Only works on the test, if there are some markers, the
filterProcessCount will be not equal with rejectedCount + rescheduledCount +
acceptCount
- Assert.assertEquals(throughFilter, filterAccepted +
filterRejected + filterRescheduled, 0.01 * throughFilter);
- }
+ Assert.assertEquals(filterAccepted, acceptCount);
+ Assert.assertEquals(filterRejected, rejectedCount);
+ // Only works on the test, if there are some markers,
+ // the filterProcessCount will be not equal with rejectedCount
+ rescheduledCount + acceptCount
+ Assert.assertEquals(throughFilter,
+ filterAccepted + filterRejected + filterRescheduled,
0.01 * throughFilter);
} else {
Assert.assertEquals(throughFilter, 0D);
Assert.assertEquals(filterAccepted, 0D);
@@ -261,19 +262,20 @@ public class SubscriptionStatsTest extends
ProducerConsumerBase {
Assert.assertEquals(rescheduledMetrics.size(), 0);
}
- testSubscriptionStatsAdminApi(topic, subName, setFilter);
+ testSubscriptionStatsAdminApi(topic, subName, setFilter, acceptCount,
rejectedCount);
}
- private void testSubscriptionStatsAdminApi(String topic, String subName,
boolean setFilter) throws Exception {
+ private void testSubscriptionStatsAdminApi(String topic, String subName,
boolean setFilter,
+ int acceptCount, int
rejectedCount) throws Exception {
boolean persistent = TopicName.get(topic).isPersistent();
TopicStats topicStats = admin.topics().getStats(topic);
SubscriptionStats stats = topicStats.getSubscriptions().get(subName);
Assert.assertNotNull(stats);
if (setFilter) {
- Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100);
+ Assert.assertEquals(stats.getFilterAcceptedMsgCount(),
acceptCount);
if (persistent) {
- Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100);
+ Assert.assertEquals(stats.getFilterRejectedMsgCount(),
rejectedCount);
// Only works on the test, if there are some markers, the
filterProcessCount will be not equal with rejectedCount + rescheduledCount +
acceptCount
Assert.assertEquals(stats.getFilterProcessedMsgCount(),
stats.getFilterAcceptedMsgCount() +
stats.getFilterRejectedMsgCount()