codelipenghui commented on a change in pull request #14531:
URL: https://github.com/apache/pulsar/pull/14531#discussion_r819587689



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
##########
@@ -159,4 +155,52 @@ public void testFilter() throws Exception {
 
     }
 
+
+    @Test
+    public void testFilteredMsgCount() throws Throwable{
+        String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+        String subName = "sub";
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)

Review comment:
       ```suggestion
           @Cleanup
           Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
   ```

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
##########
@@ -159,4 +155,52 @@ public void testFilter() throws Exception {
 
     }
 
+
+    @Test
+    public void testFilteredMsgCount() throws Throwable{
+        String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+        String subName = "sub";
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionName(subName).subscribe();
+        // mock entry filters
+        PersistentSubscription subscription = (PersistentSubscription) 
pulsar.getBrokerService()
+                .getTopicReference(topic).get().getSubscription(subName);
+        Dispatcher dispatcher = subscription.getDispatcher();
+        Field field = 
AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+        field.setAccessible(true);
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        EntryFilter filter1 = new EntryFilterTest();
+        EntryFilterWithClassLoader loader1 = 
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, 
narClassLoader);
+        EntryFilter filter2 = new EntryFilter2Test();
+        EntryFilterWithClassLoader loader2 = 
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, 
narClassLoader);
+        field.set(dispatcher, ImmutableList.of(loader1, loader2));
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)

Review comment:
       ```suggestion
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
   ```

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
##########
@@ -32,10 +31,7 @@
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.service.AbstractBaseDispatcher;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.*;

Review comment:
       Please avoid star import.

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
##########
@@ -159,4 +155,52 @@ public void testFilter() throws Exception {
 
     }
 
+
+    @Test
+    public void testFilteredMsgCount() throws Throwable{
+        String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+        String subName = "sub";
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionName(subName).subscribe();
+        // mock entry filters
+        PersistentSubscription subscription = (PersistentSubscription) 
pulsar.getBrokerService()
+                .getTopicReference(topic).get().getSubscription(subName);
+        Dispatcher dispatcher = subscription.getDispatcher();
+        Field field = 
AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+        field.setAccessible(true);
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        EntryFilter filter1 = new EntryFilterTest();
+        EntryFilterWithClassLoader loader1 = 
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, 
narClassLoader);
+        EntryFilter filter2 = new EntryFilter2Test();
+        EntryFilterWithClassLoader loader2 = 
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, 
narClassLoader);
+        field.set(dispatcher, ImmutableList.of(loader1, loader2));
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topic).create();
+        for (int i = 0; i < 10; i++) {
+            producer.send("test");
+        }
+
+        for (int i = 0; i < 10; i++) {
+            assertNotNull(producer.newMessage().property("REJECT", 
"").value("1").send());
+        }
+
+
+        int counter = 0;
+        while (true) {
+            Message<String> message = consumer.receive(1, TimeUnit.SECONDS);

Review comment:
       1 second might be too short in the CI environment if the broker dispatch 
messages slowly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to