This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fb7fb8f13f3f00189b758f0a60ea388ad693a389 Author: Lari Hotari <[email protected]> AuthorDate: Wed Jun 7 22:52:13 2023 +0300 [fix][broker] Disable EntryFilters for system topics (#20514) (cherry picked from commit ac46e2e4fc48dff74233623afa3635ef5285e34d) --- .../org/apache/pulsar/broker/service/AbstractTopic.java | 4 ++++ .../apache/pulsar/broker/service/EntryFilterSupport.java | 3 ++- .../pulsar/broker/service/persistent/SystemTopic.java | 13 +++++++++++++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 4614b846c8e..1371019be41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1163,6 +1163,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP } public void updateEntryFilters() { + if (isSystemTopic()) { + entryFilters = Pair.of(null, Collections.emptyList()); + return; + } final EntryFilters entryFiltersPolicy = getEntryFiltersPolicy(); if (entryFiltersPolicy == null || StringUtils.isBlank(entryFiltersPolicy.getEntryFilterNames())) { entryFilters = Pair.of(null, Collections.emptyList()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java index 4a9b33a9afd..03d6f0750e0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java @@ -35,7 +35,8 @@ public class EntryFilterSupport { public EntryFilterSupport(Subscription subscription) { this.subscription = subscription; - if (subscription != null && subscription.getTopic() != null) { + if (subscription != null && subscription.getTopic() != null + && !subscription.getTopic().isSystemTopic()) { final BrokerService brokerService = subscription.getTopic().getBrokerService(); final boolean allowOverrideEntryFilters = brokerService .pulsar().getConfiguration().isAllowOverrideEntryFilters(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index 395a8c9075e..720ae3c5189 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -18,13 +18,16 @@ */ package org.apache.pulsar.broker.service.persistent; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.EntryFilters; public class SystemTopic extends PersistentTopic { @@ -82,4 +85,14 @@ public class SystemTopic extends PersistentTopic { // System topics are only written by the broker that can't know the encryption context. return false; } + + @Override + public EntryFilters getEntryFiltersPolicy() { + return null; + } + + @Override + public List<EntryFilter> getEntryFilters() { + return null; + } }
