This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0975cdc2489 [enh] [broker] EntryFilter (PIP-105) - support
per-Consumer filtering (#15391)
0975cdc2489 is described below
commit 0975cdc2489739b7518ee7acc78bbc6e8c8e2e4d
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon May 9 13:20:04 2022 +0200
[enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering
(#15391)
---
.../broker/service/AbstractBaseDispatcher.java | 44 ++++++--
.../NonPersistentDispatcherMultipleConsumers.java | 2 +-
...onPersistentDispatcherSingleActiveConsumer.java | 2 +-
...istentStickyKeyDispatcherMultipleConsumers.java | 2 +-
.../PersistentDispatcherMultipleConsumers.java | 2 +-
.../PersistentDispatcherSingleActiveConsumer.java | 3 +-
...istentStickyKeyDispatcherMultipleConsumers.java | 2 +-
...entStreamingDispatcherSingleActiveConsumer.java | 2 +-
.../pulsar/broker/service/plugin/EntryFilter.java | 4 +
.../broker/service/plugin/FilterContext.java | 3 +
.../broker/service/AbstractBaseDispatcherTest.java | 12 +--
.../broker/service/plugin/EntryFilterTest.java | 22 +++-
.../broker/service/plugin/FilterEntryTest.java | 116 +++++++++++++++++++++
13 files changed, 190 insertions(+), 26 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index e7c840b9279..7fc40038c7c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -121,19 +122,20 @@ public abstract class AbstractBaseDispatcher implements
Dispatcher {
*/
public int filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes
batchSizes,
SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
- ManagedCursor cursor, boolean isReplayRead) {
+ ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
return filterEntriesForConsumer(Optional.empty(), 0, entries,
batchSizes, sendMessageInfo, indexesAcks, cursor,
- isReplayRead);
+ isReplayRead, consumer);
}
public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper,
int entryWrapperOffset,
List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo
sendMessageInfo,
- EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean
isReplayRead) {
+ EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean
isReplayRead, Consumer consumer) {
int totalMessages = 0;
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
List<Position> entriesToFiltered =
CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
+ List<PositionImpl> entriesToRedeliver =
CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
if (entry == null) {
@@ -147,13 +149,20 @@ public abstract class AbstractBaseDispatcher implements
Dispatcher {
msgMetadata = msgMetadata == null
? Commands.peekMessageMetadata(metadataAndPayload,
subscription.toString(), -1)
: msgMetadata;
+ EntryFilter.FilterResult filterResult =
EntryFilter.FilterResult.ACCEPT;
if (CollectionUtils.isNotEmpty(entryFilters)) {
- fillContext(filterContext, msgMetadata, subscription);
- if (EntryFilter.FilterResult.REJECT ==
getFilterResult(filterContext, entry, entryFilters)) {
+ fillContext(filterContext, msgMetadata, subscription,
consumer);
+ filterResult = getFilterResult(filterContext, entry,
entryFilters);
+ if (filterResult == EntryFilter.FilterResult.REJECT) {
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
entry.release();
continue;
+ } else if (filterResult ==
EntryFilter.FilterResult.RESCHEDULE) {
+ entriesToRedeliver.add((PositionImpl) entry.getPosition());
+ entries.set(i, null);
+ entry.release();
+ continue;
}
}
if (!isReplayRead && msgMetadata != null &&
msgMetadata.hasTxnidMostBits()
@@ -227,6 +236,15 @@ public abstract class AbstractBaseDispatcher implements
Dispatcher {
((AbstractTopic) topic).addFilteredEntriesCount(filtered);
}
}
+ if (CollectionUtils.isNotEmpty(entriesToRedeliver)) {
+
this.subscription.getTopic().getBrokerService().getPulsar().getExecutor()
+ .schedule(() -> {
+ // simulate the Consumer rejected the message
+ subscription
+ .redeliverUnacknowledgedMessages(consumer,
entriesToRedeliver);
+ }, 1, TimeUnit.SECONDS);
+
+ }
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
@@ -236,21 +254,25 @@ public abstract class AbstractBaseDispatcher implements
Dispatcher {
private static EntryFilter.FilterResult getFilterResult(FilterContext
filterContext, Entry entry,
ImmutableList<EntryFilterWithClassLoader> entryFilters) {
- EntryFilter.FilterResult result = EntryFilter.FilterResult.ACCEPT;
for (EntryFilter entryFilter : entryFilters) {
- if (entryFilter.filterEntry(entry, filterContext) ==
EntryFilter.FilterResult.REJECT) {
- result = EntryFilter.FilterResult.REJECT;
- break;
+ EntryFilter.FilterResult filterResult =
+ entryFilter.filterEntry(entry, filterContext);
+ if (filterResult == null) {
+ filterResult = EntryFilter.FilterResult.ACCEPT;
+ }
+ if (filterResult != EntryFilter.FilterResult.ACCEPT) {
+ return filterResult;
}
}
- return result;
+ return EntryFilter.FilterResult.ACCEPT;
}
private void fillContext(FilterContext context, MessageMetadata
msgMetadata,
- Subscription subscription) {
+ Subscription subscription, Consumer consumer) {
context.reset();
context.setMsgMetadata(msgMetadata);
context.setSubscription(subscription);
+ context.setConsumer(consumer);
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 7fff7a7f35d..96945840255 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -187,7 +187,7 @@ public class NonPersistentDispatcherMultipleConsumers
extends AbstractDispatcher
if (consumer != null) {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
- filterEntriesForConsumer(entries, batchSizes, sendMessageInfo,
null, null, false);
+ filterEntriesForConsumer(entries, batchSizes, sendMessageInfo,
null, null, false, consumer);
consumer.sendMessages(entries, batchSizes, null,
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 9c404d5d665..3c9ecb74c75 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -57,7 +57,7 @@ public final class
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
if (currentConsumer != null && currentConsumer.getAvailablePermits() >
0 && currentConsumer.isWritable()) {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
- filterEntriesForConsumer(entries, batchSizes, sendMessageInfo,
null, null, false);
+ filterEntriesForConsumer(entries, batchSizes, sendMessageInfo,
null, null, false, currentConsumer);
currentConsumer.sendMessages(entries, batchSizes, null,
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 0dac0a235ba..e5e53496519 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -142,7 +142,7 @@ public class
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes =
EntryBatchSizes.get(entriesForConsumer.size());
- filterEntriesForConsumer(entriesForConsumer, batchSizes,
sendMessageInfo, null, null, false);
+ filterEntriesForConsumer(entriesForConsumer, batchSizes,
sendMessageInfo, null, null, false, consumer);
if (consumer.getAvailablePermits() > 0 && consumer.isWritable()) {
consumer.sendMessages(entriesForConsumer, batchSizes, null,
sendMessageInfo.getTotalMessages(),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c9e5bcbcce6..a874706433e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -566,7 +566,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
totalEntries +=
filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start,
entriesForThisConsumer, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor,
- readType == ReadType.Replay);
+ readType == ReadType.Replay, c);
c.sendMessages(entriesForThisConsumer, batchSizes,
batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index e062b070a65..a0af118f6e6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -203,7 +203,8 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(entries.size());
- filterEntriesForConsumer(entries, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, false);
+ filterEntriesForConsumer(entries, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, false,
+ currentConsumer);
dispatchEntriesToConsumer(currentConsumer, entries, batchSizes,
batchIndexesAcks, sendMessageInfo, epoch);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index da46bf449fd..3e01531fc3e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -270,7 +270,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(messagesForC);
totalEntries += filterEntriesForConsumer(entriesWithSameKey,
batchSizes, sendMessageInfo,
- batchIndexesAcks, cursor, readType == ReadType.Replay);
+ batchIndexesAcks, cursor, readType == ReadType.Replay,
consumer);
consumer.sendMessages(entriesWithSameKey, batchSizes,
batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 0004ffd1df6..01ef7216d20 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -165,7 +165,7 @@ public class
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(1);
filterEntriesForConsumer(Lists.newArrayList(entry), batchSizes,
sendMessageInfo, batchIndexesAcks,
- cursor, false);
+ cursor, false, consumer);
// Update cursor's read position.
cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
.getNextValidPosition((PositionImpl) entry.getPosition()));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
index 40e6644953f..7246a3b1089 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
@@ -48,6 +48,10 @@ public interface EntryFilter {
* skip the message.
*/
REJECT,
+ /**
+ * postpone message, it should not go to this conmumer.
+ */
+ RESCHEDULE
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
index e520e1011f9..edbe6758165 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.plugin;
import lombok.Data;
+import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -26,10 +27,12 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
public class FilterContext {
private Subscription subscription;
private MessageMetadata msgMetadata;
+ private Consumer consumer;
public void reset() {
subscription = null;
msgMetadata = null;
+ consumer = null;
}
public static final FilterContext FILTER_CONTEXT_DISABLED = new
FilterContext();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index 07872f37683..09c2962cd65 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -72,7 +72,7 @@ public class AbstractBaseDispatcherTest {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
- int size = this.helper.filterEntriesForConsumer(entries, batchSizes,
sendMessageInfo, null, null, false);
+ int size = this.helper.filterEntriesForConsumer(entries, batchSizes,
sendMessageInfo, null, null, false, null);
assertEquals(size, 0);
}
@@ -99,7 +99,7 @@ public class AbstractBaseDispatcherTest {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
//
- int size = this.helper.filterEntriesForConsumer(entries, batchSizes,
sendMessageInfo, null, null, false);
+ int size = this.helper.filterEntriesForConsumer(entries, batchSizes,
sendMessageInfo, null, null, false, null);
assertEquals(size, 0);
}
@@ -110,7 +110,7 @@ public class AbstractBaseDispatcherTest {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
- int size = this.helper.filterEntriesForConsumer(entries, batchSizes,
sendMessageInfo, null, null, false);
+ int size = this.helper.filterEntriesForConsumer(entries, batchSizes,
sendMessageInfo, null, null, false, null);
assertEquals(size, 0);
}
@@ -126,7 +126,7 @@ public class AbstractBaseDispatcherTest {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
- int size = this.helper.filterEntriesForConsumer(entries, batchSizes,
sendMessageInfo, null, null, false);
+ int size = this.helper.filterEntriesForConsumer(entries, batchSizes,
sendMessageInfo, null, null, false, null);
assertEquals(size, 0);
}
@@ -139,7 +139,7 @@ public class AbstractBaseDispatcherTest {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
- int size = this.helper.filterEntriesForConsumer(entries, batchSizes,
sendMessageInfo, null, null, false);
+ int size = this.helper.filterEntriesForConsumer(entries, batchSizes,
sendMessageInfo, null, null, false, null);
assertEquals(size, 0);
}
@@ -150,7 +150,7 @@ public class AbstractBaseDispatcherTest {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
- int size = this.helper.filterEntriesForConsumer(entries, batchSizes,
sendMessageInfo, null, null, false);
+ int size = this.helper.filterEntriesForConsumer(entries, batchSizes,
sendMessageInfo, null, null, false, null);
assertEquals(size, 0);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
index 812d49aa7b4..c8d13c4826e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
@@ -20,22 +20,40 @@ package org.apache.pulsar.broker.service.plugin;
import java.util.List;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.common.api.proto.KeyValue;
+@Slf4j
public class EntryFilterTest implements EntryFilter {
@Override
public FilterResult filterEntry(Entry entry, FilterContext context) {
if (context.getMsgMetadata() == null ||
context.getMsgMetadata().getPropertiesCount() <= 0) {
return FilterResult.ACCEPT;
}
+ Consumer consumer = context.getConsumer();
+ Map<String, String> metadata = consumer.getMetadata();
+ log.info("filterEntry for {}", metadata);
+ String matchValueAccept = metadata.getOrDefault("matchValueAccept",
"ACCEPT");
+ String matchValueReject = metadata.getOrDefault("matchValueReject",
"REJECT");
+ String matchValueReschedule =
metadata.getOrDefault("matchValueReschedule", "RESCHEDULE");
List<KeyValue> list = context.getMsgMetadata().getPropertiesList();
// filter by string
for (KeyValue keyValue : list) {
- if ("ACCEPT".equalsIgnoreCase(keyValue.getKey())) {
+ if (matchValueAccept.equalsIgnoreCase(keyValue.getKey())) {
+ log.info("metadata {} key {} outcome ACCEPT", metadata,
keyValue.getKey());
return FilterResult.ACCEPT;
- } else if ("REJECT".equalsIgnoreCase(keyValue.getKey())){
+ } else if (matchValueReject.equalsIgnoreCase(keyValue.getKey())){
+ log.info("metadata {} key {} outcome REJECT", metadata,
keyValue.getKey());
return FilterResult.REJECT;
+ } else if
(matchValueReschedule.equalsIgnoreCase(keyValue.getKey())){
+ log.info("metadata {} key {} outcome RESCHEDULE", metadata,
keyValue.getKey());
+ return FilterResult.RESCHEDULE;
+ } else {
+ log.info("metadata {} key {} outcome ??", metadata,
keyValue.getKey());
}
}
return null;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index 24b73ab8890..81ad811f43c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -30,7 +30,10 @@ import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.AbstractBaseDispatcher;
import org.apache.pulsar.broker.service.AbstractTopic;
@@ -42,6 +45,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.awaitility.Awaitility;
@@ -50,6 +54,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
+@Slf4j
public class FilterEntryTest extends BrokerTestBase {
@BeforeMethod
@Override
@@ -211,4 +216,115 @@ public class FilterEntryTest extends BrokerTestBase {
assertEquals(filtered, 10);
}
}
+
+ @Test
+ public void
testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscription() throws
Throwable {
+ String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+ String subName = "sub";
+
+ Map<String, String> metadataConsumer1 = new HashMap<>();
+ metadataConsumer1.put("matchValueAccept", "FOR-1");
+ metadataConsumer1.put("matchValueReschedule", "FOR-2");
+
+ Map<String, String> metadataConsumer2 = new HashMap<>();
+ metadataConsumer2.put("matchValueAccept", "FOR-2");
+ metadataConsumer2.put("matchValueReschedule", "FOR-1");
+
+ try (Producer<String> producer =
pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topic).create();
+ Consumer<String> consumer1 =
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .properties(metadataConsumer1)
+ .consumerName("consumer1")
+ .receiverQueueSize(5)
+ .subscriptionName(subName)
+ .subscribe();
+ Consumer<String> consumer2 =
pulsarClient.newConsumer(Schema.STRING)
+ .subscriptionType(SubscriptionType.Shared)
+ .properties(metadataConsumer2)
+ .consumerName("consumer2")
+ .topic(topic)
+ .receiverQueueSize(5)
+ .subscriptionName(subName)
+ .subscribe()) {
+
+ // mock entry filters
+ PersistentSubscription subscription = (PersistentSubscription)
pulsar.getBrokerService()
+ .getTopicReference(topic).get().getSubscription(subName);
+ Dispatcher dispatcher = subscription.getDispatcher();
+ Field field =
AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+ field.setAccessible(true);
+ NarClassLoader narClassLoader = mock(NarClassLoader.class);
+ EntryFilter filter1 = new EntryFilterTest();
+ EntryFilterWithClassLoader loader1 =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1,
narClassLoader);
+ EntryFilter filter2 = new EntryFilterTest();
+ EntryFilterWithClassLoader loader2 =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2,
narClassLoader);
+ field.set(dispatcher, ImmutableList.of(loader1, loader2));
+
+ int numMessages = 200;
+ for (int i = 0; i < numMessages; i++) {
+ if (i % 2 == 0) {
+ producer.newMessage()
+ .property("FOR-1", "")
+ .value("consumer-1")
+ .send();
+ } else {
+ producer.newMessage()
+ .property("FOR-2", "")
+ .value("consumer-2")
+ .send();
+ }
+ }
+
+ CompletableFuture<Void> resultConsume1 = new CompletableFuture<>();
+ pulsar.getExecutor().submit(() -> {
+ try {
+ // assert that the consumer1 receive all the
messages and that such messages
+ // are for consumer1
+ int counter = 0;
+ while (counter < numMessages / 2) {
+ Message<String> message = consumer1.receive(1,
TimeUnit.MINUTES);
+ if (message != null) {
+ log.info("received1 {} - {}",
message.getValue(), message.getProperties());
+ counter++;
+ assertEquals("consumer-1",
message.getValue());
+ consumer1.acknowledgeAsync(message);
+ } else {
+ resultConsume1.completeExceptionally(
+ new Exception("consumer1 did not
receive all the messages"));
+ }
+ }
+ resultConsume1.complete(null);
+ } catch (Throwable err) {
+ resultConsume1.completeExceptionally(err);
+ }
+ });
+
+ CompletableFuture<Void> resultConsume2 = new CompletableFuture<>();
+ pulsar.getExecutor().submit(() -> {
+ try {
+ // assert that the consumer2 receive all the messages and
that such messages
+ // are for consumer2
+ int counter = 0;
+ while (counter < numMessages / 2) {
+ Message<String> message = consumer2.receive(1,
TimeUnit.MINUTES);
+ if (message != null) {
+ log.info("received2 {} - {}", message.getValue(),
message.getProperties());
+ counter++;
+ assertEquals("consumer-2", message.getValue());
+ consumer2.acknowledgeAsync(message);
+ } else {
+ resultConsume2.completeExceptionally(
+ new Exception("consumer2 did not receive
all the messages"));
+ }
+ }
+ resultConsume2.complete(null);
+ } catch (Throwable err) {
+ resultConsume1.completeExceptionally(err);
+ }
+ });
+ resultConsume1.get(1, TimeUnit.MINUTES);
+ resultConsume2.get(1, TimeUnit.MINUTES);
+ }
+ }
}