This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0975cdc2489 [enh] [broker] EntryFilter (PIP-105) - support 
per-Consumer filtering (#15391)
0975cdc2489 is described below

commit 0975cdc2489739b7518ee7acc78bbc6e8c8e2e4d
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon May 9 13:20:04 2022 +0200

    [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering 
(#15391)
---
 .../broker/service/AbstractBaseDispatcher.java     |  44 ++++++--
 .../NonPersistentDispatcherMultipleConsumers.java  |   2 +-
 ...onPersistentDispatcherSingleActiveConsumer.java |   2 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |   2 +-
 .../PersistentDispatcherMultipleConsumers.java     |   2 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |   3 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |   2 +-
 ...entStreamingDispatcherSingleActiveConsumer.java |   2 +-
 .../pulsar/broker/service/plugin/EntryFilter.java  |   4 +
 .../broker/service/plugin/FilterContext.java       |   3 +
 .../broker/service/AbstractBaseDispatcherTest.java |  12 +--
 .../broker/service/plugin/EntryFilterTest.java     |  22 +++-
 .../broker/service/plugin/FilterEntryTest.java     | 116 +++++++++++++++++++++
 13 files changed, 190 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index e7c840b9279..7fc40038c7c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -121,19 +122,20 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
      */
     public int filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes 
batchSizes,
             SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
-            ManagedCursor cursor, boolean isReplayRead) {
+            ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
         return filterEntriesForConsumer(Optional.empty(), 0, entries, 
batchSizes, sendMessageInfo, indexesAcks, cursor,
-                isReplayRead);
+                isReplayRead, consumer);
     }
 
     public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, 
int entryWrapperOffset,
              List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo 
sendMessageInfo,
-             EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean 
isReplayRead) {
+             EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean 
isReplayRead, Consumer consumer) {
         int totalMessages = 0;
         long totalBytes = 0;
         int totalChunkedMessages = 0;
         int totalEntries = 0;
         List<Position> entriesToFiltered = 
CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
+        List<PositionImpl> entriesToRedeliver = 
CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
         for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
             Entry entry = entries.get(i);
             if (entry == null) {
@@ -147,13 +149,20 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
             msgMetadata = msgMetadata == null
                     ? Commands.peekMessageMetadata(metadataAndPayload, 
subscription.toString(), -1)
                     : msgMetadata;
+            EntryFilter.FilterResult filterResult = 
EntryFilter.FilterResult.ACCEPT;
             if (CollectionUtils.isNotEmpty(entryFilters)) {
-                fillContext(filterContext, msgMetadata, subscription);
-                if (EntryFilter.FilterResult.REJECT == 
getFilterResult(filterContext, entry, entryFilters)) {
+                fillContext(filterContext, msgMetadata, subscription, 
consumer);
+                filterResult = getFilterResult(filterContext, entry, 
entryFilters);
+                if (filterResult == EntryFilter.FilterResult.REJECT) {
                     entriesToFiltered.add(entry.getPosition());
                     entries.set(i, null);
                     entry.release();
                     continue;
+                } else if (filterResult == 
EntryFilter.FilterResult.RESCHEDULE) {
+                    entriesToRedeliver.add((PositionImpl) entry.getPosition());
+                    entries.set(i, null);
+                    entry.release();
+                    continue;
                 }
             }
             if (!isReplayRead && msgMetadata != null && 
msgMetadata.hasTxnidMostBits()
@@ -227,6 +236,15 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
                 ((AbstractTopic) topic).addFilteredEntriesCount(filtered);
             }
         }
+        if (CollectionUtils.isNotEmpty(entriesToRedeliver)) {
+            
this.subscription.getTopic().getBrokerService().getPulsar().getExecutor()
+                    .schedule(() -> {
+                        // simulate the Consumer rejected the message
+                        subscription
+                                .redeliverUnacknowledgedMessages(consumer, 
entriesToRedeliver);
+                    }, 1, TimeUnit.SECONDS);
+
+        }
 
         sendMessageInfo.setTotalMessages(totalMessages);
         sendMessageInfo.setTotalBytes(totalBytes);
@@ -236,21 +254,25 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
 
     private static EntryFilter.FilterResult getFilterResult(FilterContext 
filterContext, Entry entry,
                                                             
ImmutableList<EntryFilterWithClassLoader> entryFilters) {
-        EntryFilter.FilterResult result = EntryFilter.FilterResult.ACCEPT;
         for (EntryFilter entryFilter : entryFilters) {
-            if (entryFilter.filterEntry(entry, filterContext) == 
EntryFilter.FilterResult.REJECT) {
-                result = EntryFilter.FilterResult.REJECT;
-                break;
+            EntryFilter.FilterResult filterResult =
+                    entryFilter.filterEntry(entry, filterContext);
+            if (filterResult == null) {
+                filterResult = EntryFilter.FilterResult.ACCEPT;
+            }
+            if (filterResult != EntryFilter.FilterResult.ACCEPT) {
+                return filterResult;
             }
         }
-        return result;
+        return EntryFilter.FilterResult.ACCEPT;
     }
 
     private void fillContext(FilterContext context, MessageMetadata 
msgMetadata,
-                             Subscription subscription) {
+                             Subscription subscription, Consumer consumer) {
         context.reset();
         context.setMsgMetadata(msgMetadata);
         context.setSubscription(subscription);
+        context.setConsumer(consumer);
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 7fff7a7f35d..96945840255 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -187,7 +187,7 @@ public class NonPersistentDispatcherMultipleConsumers 
extends AbstractDispatcher
         if (consumer != null) {
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
-            filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, 
null, null, false);
+            filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, 
null, null, false, consumer);
             consumer.sendMessages(entries, batchSizes, null, 
sendMessageInfo.getTotalMessages(),
                     sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 9c404d5d665..3c9ecb74c75 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -57,7 +57,7 @@ public final class 
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
         if (currentConsumer != null && currentConsumer.getAvailablePermits() > 
0 && currentConsumer.isWritable()) {
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
-            filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, 
null, null, false);
+            filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, 
null, null, false, currentConsumer);
             currentConsumer.sendMessages(entries, batchSizes, null, 
sendMessageInfo.getTotalMessages(),
                     sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
         } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 0dac0a235ba..e5e53496519 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -142,7 +142,7 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
 
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchSizes batchSizes = 
EntryBatchSizes.get(entriesForConsumer.size());
-            filterEntriesForConsumer(entriesForConsumer, batchSizes, 
sendMessageInfo, null, null, false);
+            filterEntriesForConsumer(entriesForConsumer, batchSizes, 
sendMessageInfo, null, null, false, consumer);
 
             if (consumer.getAvailablePermits() > 0 && consumer.isWritable()) {
                 consumer.sendMessages(entriesForConsumer, batchSizes, null, 
sendMessageInfo.getTotalMessages(),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c9e5bcbcce6..a874706433e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -566,7 +566,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
                 totalEntries += 
filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start,
                         entriesForThisConsumer, batchSizes, sendMessageInfo, 
batchIndexesAcks, cursor,
-                        readType == ReadType.Replay);
+                        readType == ReadType.Replay, c);
 
                 c.sendMessages(entriesForThisConsumer, batchSizes, 
batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index e062b070a65..a0af118f6e6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -203,7 +203,8 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(entries.size());
-            filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, 
batchIndexesAcks, cursor, false);
+            filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, 
batchIndexesAcks, cursor, false,
+                    currentConsumer);
             dispatchEntriesToConsumer(currentConsumer, entries, batchSizes, 
batchIndexesAcks, sendMessageInfo, epoch);
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index da46bf449fd..3e01531fc3e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -270,7 +270,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
                 EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(messagesForC);
                 totalEntries += filterEntriesForConsumer(entriesWithSameKey, 
batchSizes, sendMessageInfo,
-                        batchIndexesAcks, cursor, readType == ReadType.Replay);
+                        batchIndexesAcks, cursor, readType == ReadType.Replay, 
consumer);
 
                 consumer.sendMessages(entriesWithSameKey, batchSizes, 
batchIndexesAcks,
                         sendMessageInfo.getTotalMessages(),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 0004ffd1df6..01ef7216d20 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -165,7 +165,7 @@ public class 
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(1);
             filterEntriesForConsumer(Lists.newArrayList(entry), batchSizes, 
sendMessageInfo, batchIndexesAcks,
-                    cursor, false);
+                    cursor, false, consumer);
             // Update cursor's read position.
             cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
                     .getNextValidPosition((PositionImpl) entry.getPosition()));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
index 40e6644953f..7246a3b1089 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
@@ -48,6 +48,10 @@ public interface EntryFilter {
          * skip the message.
          */
         REJECT,
+        /**
+         * postpone message, it should not go to this conmumer.
+         */
+        RESCHEDULE
     }
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
index e520e1011f9..edbe6758165 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.plugin;
 
 import lombok.Data;
+import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 
@@ -26,10 +27,12 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 public class FilterContext {
     private Subscription subscription;
     private MessageMetadata msgMetadata;
+    private Consumer consumer;
 
     public void reset() {
         subscription = null;
         msgMetadata = null;
+        consumer = null;
     }
 
     public static final FilterContext FILTER_CONTEXT_DISABLED = new 
FilterContext();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index 07872f37683..09c2962cd65 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -72,7 +72,7 @@ public class AbstractBaseDispatcherTest {
         SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
         EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
 
-        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, 
sendMessageInfo, null, null, false);
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, 
sendMessageInfo, null, null, false, null);
         assertEquals(size, 0);
     }
 
@@ -99,7 +99,7 @@ public class AbstractBaseDispatcherTest {
         SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
         EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
         //
-        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, 
sendMessageInfo, null, null, false);
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, 
sendMessageInfo, null, null, false, null);
         assertEquals(size, 0);
     }
 
@@ -110,7 +110,7 @@ public class AbstractBaseDispatcherTest {
 
         SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
         EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
-        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, 
sendMessageInfo, null, null, false);
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, 
sendMessageInfo, null, null, false, null);
         assertEquals(size, 0);
     }
 
@@ -126,7 +126,7 @@ public class AbstractBaseDispatcherTest {
 
         SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
         EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
-        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, 
sendMessageInfo, null, null, false);
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, 
sendMessageInfo, null, null, false, null);
         assertEquals(size, 0);
     }
 
@@ -139,7 +139,7 @@ public class AbstractBaseDispatcherTest {
 
         SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
         EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
-        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, 
sendMessageInfo, null, null, false);
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, 
sendMessageInfo, null, null, false, null);
         assertEquals(size, 0);
     }
 
@@ -150,7 +150,7 @@ public class AbstractBaseDispatcherTest {
 
         SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
         EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
-        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, 
sendMessageInfo, null, null, false);
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, 
sendMessageInfo, null, null, false, null);
         assertEquals(size, 0);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
index 812d49aa7b4..c8d13c4826e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
@@ -20,22 +20,40 @@ package org.apache.pulsar.broker.service.plugin;
 
 
 import java.util.List;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.common.api.proto.KeyValue;
 
+@Slf4j
 public class EntryFilterTest implements EntryFilter {
     @Override
     public FilterResult filterEntry(Entry entry, FilterContext context) {
         if (context.getMsgMetadata() == null || 
context.getMsgMetadata().getPropertiesCount() <= 0) {
             return FilterResult.ACCEPT;
         }
+        Consumer consumer = context.getConsumer();
+        Map<String, String> metadata = consumer.getMetadata();
+        log.info("filterEntry for {}", metadata);
+        String matchValueAccept = metadata.getOrDefault("matchValueAccept", 
"ACCEPT");
+        String matchValueReject = metadata.getOrDefault("matchValueReject", 
"REJECT");
+        String matchValueReschedule = 
metadata.getOrDefault("matchValueReschedule", "RESCHEDULE");
         List<KeyValue> list = context.getMsgMetadata().getPropertiesList();
         // filter by string
         for (KeyValue keyValue : list) {
-            if ("ACCEPT".equalsIgnoreCase(keyValue.getKey())) {
+            if (matchValueAccept.equalsIgnoreCase(keyValue.getKey())) {
+                log.info("metadata {} key {} outcome ACCEPT", metadata, 
keyValue.getKey());
                 return FilterResult.ACCEPT;
-            } else if ("REJECT".equalsIgnoreCase(keyValue.getKey())){
+            } else if (matchValueReject.equalsIgnoreCase(keyValue.getKey())){
+                log.info("metadata {} key {} outcome REJECT", metadata, 
keyValue.getKey());
                 return FilterResult.REJECT;
+            } else if 
(matchValueReschedule.equalsIgnoreCase(keyValue.getKey())){
+                log.info("metadata {} key {} outcome RESCHEDULE", metadata, 
keyValue.getKey());
+                return FilterResult.RESCHEDULE;
+            } else {
+                log.info("metadata {} key {} outcome ??", metadata, 
keyValue.getKey());
             }
         }
         return null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index 24b73ab8890..81ad811f43c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -30,7 +30,10 @@ import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.AbstractBaseDispatcher;
 import org.apache.pulsar.broker.service.AbstractTopic;
@@ -42,6 +45,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.awaitility.Awaitility;
@@ -50,6 +54,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
+@Slf4j
 public class FilterEntryTest extends BrokerTestBase {
     @BeforeMethod
     @Override
@@ -211,4 +216,115 @@ public class FilterEntryTest extends BrokerTestBase {
             assertEquals(filtered, 10);
         }
     }
+
+    @Test
+    public void 
testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscription() throws 
Throwable {
+        String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+        String subName = "sub";
+
+        Map<String, String> metadataConsumer1 = new HashMap<>();
+        metadataConsumer1.put("matchValueAccept", "FOR-1");
+        metadataConsumer1.put("matchValueReschedule", "FOR-2");
+
+        Map<String, String> metadataConsumer2 = new HashMap<>();
+        metadataConsumer2.put("matchValueAccept", "FOR-2");
+        metadataConsumer2.put("matchValueReschedule", "FOR-1");
+
+        try (Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topic).create();
+             Consumer<String> consumer1 = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                     .subscriptionType(SubscriptionType.Shared)
+                     .properties(metadataConsumer1)
+                     .consumerName("consumer1")
+                     .receiverQueueSize(5)
+                     .subscriptionName(subName)
+                     .subscribe();
+             Consumer<String> consumer2 = 
pulsarClient.newConsumer(Schema.STRING)
+                     .subscriptionType(SubscriptionType.Shared)
+                     .properties(metadataConsumer2)
+                     .consumerName("consumer2")
+                     .topic(topic)
+                     .receiverQueueSize(5)
+                     .subscriptionName(subName)
+                     .subscribe()) {
+
+            // mock entry filters
+            PersistentSubscription subscription = (PersistentSubscription) 
pulsar.getBrokerService()
+                    .getTopicReference(topic).get().getSubscription(subName);
+            Dispatcher dispatcher = subscription.getDispatcher();
+            Field field = 
AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+            field.setAccessible(true);
+            NarClassLoader narClassLoader = mock(NarClassLoader.class);
+            EntryFilter filter1 = new EntryFilterTest();
+            EntryFilterWithClassLoader loader1 = 
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, 
narClassLoader);
+            EntryFilter filter2 = new EntryFilterTest();
+            EntryFilterWithClassLoader loader2 = 
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, 
narClassLoader);
+            field.set(dispatcher, ImmutableList.of(loader1, loader2));
+
+            int numMessages = 200;
+            for (int i = 0; i < numMessages; i++) {
+                if (i % 2 == 0) {
+                    producer.newMessage()
+                            .property("FOR-1", "")
+                            .value("consumer-1")
+                            .send();
+                } else {
+                    producer.newMessage()
+                            .property("FOR-2", "")
+                            .value("consumer-2")
+                            .send();
+                }
+            }
+
+            CompletableFuture<Void> resultConsume1 = new CompletableFuture<>();
+            pulsar.getExecutor().submit(() -> {
+                        try {
+                            // assert that the consumer1 receive all the 
messages and that such messages
+                            // are for consumer1
+                            int counter = 0;
+                            while (counter < numMessages / 2) {
+                                Message<String> message = consumer1.receive(1, 
TimeUnit.MINUTES);
+                                if (message != null) {
+                                    log.info("received1 {} - {}", 
message.getValue(), message.getProperties());
+                                    counter++;
+                                    assertEquals("consumer-1", 
message.getValue());
+                                    consumer1.acknowledgeAsync(message);
+                                } else {
+                                    resultConsume1.completeExceptionally(
+                                            new Exception("consumer1 did not 
receive all the messages"));
+                                }
+                            }
+                            resultConsume1.complete(null);
+                        } catch (Throwable err) {
+                            resultConsume1.completeExceptionally(err);
+                        }
+                    });
+
+            CompletableFuture<Void> resultConsume2 = new CompletableFuture<>();
+            pulsar.getExecutor().submit(() -> {
+                try {
+                    // assert that the consumer2 receive all the messages and 
that such messages
+                    // are for consumer2
+                    int counter = 0;
+                    while (counter < numMessages / 2) {
+                        Message<String> message = consumer2.receive(1, 
TimeUnit.MINUTES);
+                        if (message != null) {
+                            log.info("received2 {} - {}", message.getValue(), 
message.getProperties());
+                            counter++;
+                            assertEquals("consumer-2", message.getValue());
+                            consumer2.acknowledgeAsync(message);
+                        } else {
+                            resultConsume2.completeExceptionally(
+                                    new Exception("consumer2 did not receive 
all the messages"));
+                        }
+                    }
+                    resultConsume2.complete(null);
+                } catch (Throwable err) {
+                    resultConsume1.completeExceptionally(err);
+                }
+            });
+            resultConsume1.get(1, TimeUnit.MINUTES);
+            resultConsume2.get(1, TimeUnit.MINUTES);
+        }
+    }
 }

Reply via email to