This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a4c3034f52f [fix][broker] Execute per-topic entry filters with the
same classloader (#19364)
a4c3034f52f is described below
commit a4c3034f52f857ae0f4daf5d366ea9e578133bc2
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed Feb 1 20:55:30 2023 +0100
[fix][broker] Execute per-topic entry filters with the same classloader
(#19364)
---
.../pulsar/broker/service/AbstractTopic.java | 35 ++-
.../pulsar/broker/service/BrokerService.java | 52 +---
.../pulsar/broker/service/EntryFilterSupport.java | 30 +--
.../org/apache/pulsar/broker/service/Topic.java | 5 +-
.../service/nonpersistent/NonPersistentTopic.java | 6 +-
.../broker/service/persistent/PersistentTopic.java | 6 +-
.../service/plugin/EntryFilterDefinition.java | 2 +
.../service/plugin/EntryFilterDefinitions.java | 28 ---
.../broker/service/plugin/EntryFilterProvider.java | 188 +++++++++------
.../service/plugin/EntryFilterWithClassLoader.java | 8 +
.../apache/pulsar/broker/admin/AdminApi2Test.java | 265 +++++++++++++++++++--
.../broker/service/AbstractBaseDispatcherTest.java | 16 +-
.../broker/service/plugin/FilterEntryTest.java | 133 +++++++++--
.../pulsar/broker/stats/ConsumerStatsTest.java | 4 +-
.../testcontext/MockEntryFilterProvider.java | 66 +++++
.../pulsar/common/policies/data/EntryFilters.java | 2 +-
16 files changed, 622 insertions(+), 224 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index c9f95ab524f..4e095cd66ba 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -43,6 +43,7 @@ import lombok.Getter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -53,7 +54,7 @@ import
org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyExcep
import
org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
-import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
@@ -148,7 +149,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
protected final LongAdder bytesOutFromRemovedSubscriptions = new
LongAdder();
- protected Map<String, EntryFilterWithClassLoader> entryFilters;
+ protected volatile Pair<String, List<EntryFilter>> entryFilters;
public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
@@ -188,8 +189,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
return this.topicPolicies.getEntryFilters().get();
}
- public Map<String, EntryFilterWithClassLoader> getEntryFilters() {
- return this.entryFilters;
+ public List<EntryFilter> getEntryFilters() {
+ return this.entryFilters.getRight();
}
public DispatchRateImpl getReplicatorDispatchRate() {
@@ -240,6 +241,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced());
topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters());
this.subscriptionPolicies = data.getSubscriptionPolicies();
+
+ updateEntryFilters();
}
protected void updateTopicPolicyByNamespacePolicy(Policies
namespacePolicies) {
@@ -288,6 +291,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
updateNamespaceDispatchRate(namespacePolicies,
brokerService.getPulsar().getConfig().getClusterName());
topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced);
topicPolicies.getEntryFilters().updateNamespaceValue(namespacePolicies.entryFilters);
+
+ updateEntryFilters();
}
private void updateNamespaceDispatchRate(Policies namespacePolicies,
String cluster) {
@@ -384,6 +389,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
topicPolicies.getSchemaValidationEnforced().updateBrokerValue(config.isSchemaValidationEnforced());
topicPolicies.getEntryFilters().updateBrokerValue(new
EntryFilters(String.join(",",
config.getEntryFilterNames())));
+
+ updateEntryFilters();
}
private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config)
{
@@ -1158,6 +1165,26 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
}
}
+ public void updateEntryFilters() {
+ final EntryFilters entryFiltersPolicy = getEntryFiltersPolicy();
+ if (entryFiltersPolicy == null ||
StringUtils.isBlank(entryFiltersPolicy.getEntryFilterNames())) {
+ entryFilters = Pair.of(null, Collections.emptyList());
+ return;
+ }
+ final String entryFilterNames =
entryFiltersPolicy.getEntryFilterNames();
+ if (entryFilters != null &&
entryFilterNames.equals(entryFilters.getLeft())) {
+ return;
+ }
+ try {
+ final List<EntryFilter> filters =
+
brokerService.getEntryFilterProvider().loadEntryFiltersForPolicy(entryFiltersPolicy);
+ entryFilters = Pair.of(entryFilterNames, filters);
+ } catch (Throwable e) {
+ log.error("Failed to load entry filters on topic {}: {}", topic,
e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
public long getMsgInCounter() {
return this.msgInCounter.longValue();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d88f040f11b..f7020963fb7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -116,7 +116,6 @@ import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
-import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -147,7 +146,6 @@ import
org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -277,7 +275,7 @@ public class BrokerService implements Closeable {
private boolean preciseTopicPublishRateLimitingEnable;
private final LongAdder pausedConnections = new LongAdder();
private BrokerInterceptor interceptor;
- private Map<String, EntryFilterWithClassLoader> entryFilters;
+ private final EntryFilterProvider entryFilterProvider;
private TopicFactory topicFactory;
private Set<BrokerEntryMetadataInterceptor>
brokerEntryMetadataInterceptors;
@@ -324,9 +322,7 @@ public class BrokerService implements Closeable {
new
ExecutorProvider.ExtendedThreadFactory("pulsar-stats-updater"));
this.authorizationService = new AuthorizationService(
pulsar.getConfiguration(), pulsar().getPulsarResources());
- if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) {
- this.entryFilters =
EntryFilterProvider.createEntryFilters(pulsar.getConfiguration());
- }
+ this.entryFilterProvider = new
EntryFilterProvider(pulsar.getConfiguration());
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
@@ -782,14 +778,8 @@ public class BrokerService implements Closeable {
});
//close entry filters
- if (entryFilters != null) {
- entryFilters.forEach((name, filter) -> {
- try {
- filter.close();
- } catch (Exception e) {
- log.warn("Error shutting down entry filter {}", name,
e);
- }
- });
+ if (entryFilterProvider != null) {
+ entryFilterProvider.close();
}
CompletableFuture<CompletableFuture<Void>>
cancellableDownstreamFutureReference = new CompletableFuture<>();
@@ -1189,27 +1179,13 @@ public class BrokerService implements Closeable {
NonPersistentTopic nonPersistentTopic;
try {
nonPersistentTopic = newTopic(topic, null, this,
NonPersistentTopic.class);
- } catch (Exception e) {
+ } catch (Throwable e) {
log.warn("Failed to create topic {}", topic, e);
return FutureUtil.failedFuture(e);
}
CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
isOwner.thenRun(() -> {
nonPersistentTopic.initialize()
- .thenAccept(__ -> {
- EntryFilters entryFiltersPolicy =
nonPersistentTopic.getEntryFiltersPolicy();
- if
(!entryFiltersPolicy.getEntryFilterNames().isEmpty()) {
- try {
- nonPersistentTopic.entryFilters =
-
EntryFilterProvider.createEntryFilters(pulsar.getConfig(),
- entryFiltersPolicy);
- } catch (IOException e) {
- log.warn("Failed to set entry filters on topic
{}-{}", topic, e.getMessage());
- pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
- topicFuture.completeExceptionally(e);
- }
- }
- })
.thenCompose(__ -> nonPersistentTopic.checkReplication())
.thenRun(() -> {
log.info("Created topic {}", nonPersistentTopic);
@@ -1577,22 +1553,6 @@ public class BrokerService implements Closeable {
: newTopic(topic, ledger,
BrokerService.this, PersistentTopic.class);
persistentTopic
.initialize()
- .thenAccept(__ -> {
- EntryFilters entryFiltersPolicy =
persistentTopic.getEntryFiltersPolicy();
- if
(!entryFiltersPolicy.getEntryFilterNames().isEmpty()) {
- try {
-
persistentTopic.entryFilters =
-
EntryFilterProvider.createEntryFilters(pulsar.getConfig(),
-
entryFiltersPolicy);
- } catch (IOException e) {
- log.warn("Failed to set
entry filters on topic {}-{}", topic,
- e.getMessage());
-
pulsar.getExecutor().execute(() ->
-
topics.remove(topic, topicFuture));
-
topicFuture.completeExceptionally(e);
- }
- }
- })
.thenCompose(__ ->
persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
.thenCompose(__ ->
persistentTopic.checkReplication())
.thenCompose(v -> {
@@ -1633,7 +1593,7 @@ public class BrokerService implements Closeable {
return null;
});
} catch (PulsarServerException e) {
- log.warn("Failed to create topic {}-{}",
topic, e.getMessage());
+ log.warn("Failed to create topic {}: {}",
topic, e.getMessage());
pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
index 6c0f1f65c69..4a9b33a9afd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
@@ -20,22 +20,15 @@ package org.apache.pulsar.broker.service;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
-import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.common.api.proto.MessageMetadata;
public class EntryFilterSupport {
- /**
- * Entry filters in Broker.
- * Not set to final, for the convenience of testing mock.
- */
- protected final List<EntryFilterWithClassLoader> entryFilters;
+ protected final List<EntryFilter> entryFilters;
protected final boolean hasFilter;
protected final FilterContext filterContext;
protected final Subscription subscription;
@@ -43,19 +36,18 @@ public class EntryFilterSupport {
public EntryFilterSupport(Subscription subscription) {
this.subscription = subscription;
if (subscription != null && subscription.getTopic() != null) {
- if (MapUtils.isNotEmpty(subscription.getTopic()
- .getBrokerService().getEntryFilters())
- && !subscription.getTopic().getBrokerService().pulsar()
- .getConfiguration().isAllowOverrideEntryFilters()) {
- this.entryFilters =
subscription.getTopic().getBrokerService().getEntryFilters().values().stream()
- .toList();
+ final BrokerService brokerService =
subscription.getTopic().getBrokerService();
+ final boolean allowOverrideEntryFilters = brokerService
+ .pulsar().getConfiguration().isAllowOverrideEntryFilters();
+ if (!allowOverrideEntryFilters) {
+ this.entryFilters =
brokerService.getEntryFilterProvider().getBrokerEntryFilters();
} else {
- Map<String, EntryFilterWithClassLoader> entryFiltersMap =
+ List<EntryFilter> topicEntryFilters =
subscription.getTopic().getEntryFilters();
- if (entryFiltersMap != null) {
- this.entryFilters =
subscription.getTopic().getEntryFilters().values().stream().toList();
+ if (topicEntryFilters != null && !topicEntryFilters.isEmpty())
{
+ this.entryFilters = topicEntryFilters;
} else {
- this.entryFilters = Collections.emptyList();
+ this.entryFilters =
brokerService.getEntryFilterProvider().getBrokerEntryFilters();
}
}
this.filterContext = new FilterContext();
@@ -86,7 +78,7 @@ public class EntryFilterSupport {
private static EntryFilter.FilterResult getFilterResult(FilterContext
filterContext, Entry entry,
-
List<EntryFilterWithClassLoader> entryFilters) {
+ List<EntryFilter>
entryFilters) {
for (EntryFilter entryFilter : entryFilters) {
EntryFilter.FilterResult filterResult =
entryFilter.filterEntry(entry, filterContext);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 3949df92cec..e6a29368dbb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import io.netty.buffer.ByteBuf;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -26,7 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
-import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
@@ -251,7 +252,7 @@ public interface Topic {
EntryFilters getEntryFiltersPolicy();
- Map<String, EntryFilterWithClassLoader> getEntryFilters();
+ List<EntryFilter> getEntryFilters();
BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index cf46103cc35..3b046570d73 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -510,11 +510,11 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
}
if (entryFilters != null) {
- entryFilters.forEach((name, filter) -> {
+ entryFilters.getRight().forEach(filter -> {
try {
filter.close();
- } catch (Exception e) {
- log.warn("Error shutting down entry filter {}", name, e);
+ } catch (Throwable e) {
+ log.warn("Error shutting down entry filter {}", filter, e);
}
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d009d3778f2..20744102a31 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1357,11 +1357,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
//close entry filters
if (entryFilters != null) {
- entryFilters.forEach((name, filter) -> {
+ entryFilters.getRight().forEach((filter) -> {
try {
filter.close();
- } catch (Exception e) {
- log.warn("Error shutting down entry filter {}", name, e);
+ } catch (Throwable e) {
+ log.warn("Error shutting down entry filter {}", filter, e);
}
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java
index 36f39efa384..fd93a6be51e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java
@@ -18,11 +18,13 @@
*/
package org.apache.pulsar.broker.service.plugin;
+import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
+@AllArgsConstructor
public class EntryFilterDefinition {
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java
deleted file mode 100644
index 384e7e2fcf4..00000000000
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.plugin;
-
-import java.util.Map;
-import java.util.TreeMap;
-import lombok.Data;
-
-@Data
-public class EntryFilterDefinitions {
- private final Map<String, EntryFilterMetaData> filters = new TreeMap<>();
-}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
index 333f7f33339..db643f43fa8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
@@ -28,6 +28,12 @@ import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -37,74 +43,85 @@ import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@Slf4j
-public class EntryFilterProvider {
+public class EntryFilterProvider implements AutoCloseable {
@VisibleForTesting
static final String ENTRY_FILTER_DEFINITION_FILE = "entry_filter";
- /**
- * create entry filter instance.
- */
- public static ImmutableMap<String, EntryFilterWithClassLoader>
createEntryFilters(ServiceConfiguration conf,
-
EntryFilters entryFilters)
+ private final ServiceConfiguration serviceConfiguration;
+ @VisibleForTesting
+ protected Map<String, EntryFilterMetaData> definitions;
+ @VisibleForTesting
+ protected Map<String, NarClassLoader> cachedClassLoaders;
+ @VisibleForTesting
+ protected List<EntryFilter> brokerEntryFilters;
+
+ public EntryFilterProvider(ServiceConfiguration conf) throws IOException {
+ this.serviceConfiguration = conf;
+ initialize();
+ initializeBrokerEntryFilters();
+ }
+
+ protected void initializeBrokerEntryFilters() throws IOException {
+ if (!serviceConfiguration.getEntryFilterNames().isEmpty()) {
+ brokerEntryFilters =
loadEntryFilters(serviceConfiguration.getEntryFilterNames());
+ } else {
+ brokerEntryFilters = Collections.emptyList();
+ }
+ }
+
+ public List<EntryFilter> loadEntryFiltersForPolicy(EntryFilters policy)
throws IOException {
- EntryFilterDefinitions definitions =
searchForEntryFilters(conf.getEntryFiltersDirectory(),
- conf.getNarExtractionDirectory());
- ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder =
ImmutableMap.builder();
- for (String filterName :
entryFilters.getEntryFilterNames().split(",")) {
- EntryFilterMetaData metaData =
definitions.getFilters().get(filterName);
- if (null == metaData) {
- throw new RuntimeException("No entry filter is found for name
`" + filterName
- + "`. Available entry filters are : " +
definitions.getFilters());
- }
- EntryFilterWithClassLoader filter;
- filter = load(metaData, conf.getNarExtractionDirectory());
- if (filter != null) {
- builder.put(filterName, filter);
- }
- log.info("Successfully loaded entry filter for name `{}` from
topic policy", filterName);
+ final String names = policy.getEntryFilterNames();
+ if (StringUtils.isBlank(names)) {
+ return Collections.emptyList();
}
- return builder.build();
+ final List<String> entryFilterList = Arrays.stream(names.split(","))
+ .filter(n -> StringUtils.isNotBlank(n))
+ .toList();
+ return loadEntryFilters(entryFilterList);
}
- public static ImmutableMap<String, EntryFilterWithClassLoader>
createEntryFilters(
- ServiceConfiguration conf) throws IOException {
- EntryFilterDefinitions definitions =
searchForEntryFilters(conf.getEntryFiltersDirectory(),
- conf.getNarExtractionDirectory());
- ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder =
ImmutableMap.builder();
- for (String filterName : conf.getEntryFilterNames()) {
- EntryFilterMetaData metaData =
definitions.getFilters().get(filterName);
+ private List<EntryFilter> loadEntryFilters(Collection<String>
entryFilterNames)
+ throws IOException {
+ ImmutableMap.Builder<String, EntryFilter> builder =
ImmutableMap.builder();
+ for (String filterName : entryFilterNames) {
+ EntryFilterMetaData metaData = definitions.get(filterName);
if (null == metaData) {
throw new RuntimeException("No entry filter is found for name
`" + filterName
- + "`. Available entry filters are : " +
definitions.getFilters());
+ + "`. Available entry filters are : " +
definitions.keySet());
}
- EntryFilterWithClassLoader filter;
- filter = load(metaData, conf.getNarExtractionDirectory());
- if (filter != null) {
- builder.put(filterName, filter);
- }
- log.info("Successfully loaded entry filter for name `{}`",
filterName);
+ final EntryFilter entryFilter = load(metaData);
+ builder.put(filterName, entryFilter);
+ log.info("Successfully loaded entry filter `{}`", filterName);
}
- return builder.build();
+ return builder.build().values().asList();
}
- private static EntryFilterDefinitions searchForEntryFilters(String
entryFiltersDirectory,
-
String narExtractionDirectory)
- throws IOException {
+ public List<EntryFilter> getBrokerEntryFilters() {
+ return brokerEntryFilters;
+ }
+
+ private void initialize() throws IOException {
+ final String entryFiltersDirectory =
serviceConfiguration.getEntryFiltersDirectory();
Path path = Paths.get(entryFiltersDirectory).toAbsolutePath();
log.info("Searching for entry filters in {}", path);
- EntryFilterDefinitions entryFilterDefinitions = new
EntryFilterDefinitions();
+
if (!path.toFile().exists()) {
log.info("Pulsar entry filters directory not found");
- return entryFilterDefinitions;
+ definitions = Collections.emptyMap();
+ cachedClassLoaders = Collections.emptyMap();
+ return;
}
+ Map<String, EntryFilterMetaData> entryFilterDefinitions = new
HashMap<>();
+ cachedClassLoaders = new HashMap<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path,
"*.nar")) {
for (Path archive : stream) {
try {
- EntryFilterDefinition def =
- getEntryFilterDefinition(archive.toString(),
narExtractionDirectory);
+ final NarClassLoader narClassLoader =
loadNarClassLoader(archive);
+ EntryFilterDefinition def =
getEntryFilterDefinition(narClassLoader);
log.info("Found entry filter from {} : {}", archive, def);
checkArgument(StringUtils.isNotBlank(def.getName()));
@@ -114,7 +131,7 @@ public class EntryFilterProvider {
metadata.setDefinition(def);
metadata.setArchivePath(archive);
- entryFilterDefinitions.getFilters().put(def.getName(),
metadata);
+ entryFilterDefinitions.put(def.getName(), metadata);
} catch (Throwable t) {
log.warn("Failed to load entry filters from {}."
+ " It is OK however if you want to use this entry
filters,"
@@ -123,19 +140,8 @@ public class EntryFilterProvider {
}
}
}
-
- return entryFilterDefinitions;
- }
-
- private static EntryFilterDefinition getEntryFilterDefinition(String
narPath,
-
String narExtractionDirectory)
- throws IOException {
- try (NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(narPath))
- .extractionDirectory(narExtractionDirectory)
- .build()) {
- return getEntryFilterDefinition(ncl);
- }
+ definitions = Collections.unmodifiableMap(entryFilterDefinitions);
+ cachedClassLoaders = Collections.unmodifiableMap(cachedClassLoaders);
}
@VisibleForTesting
@@ -153,22 +159,19 @@ public class EntryFilterProvider {
);
}
- private static EntryFilterWithClassLoader load(EntryFilterMetaData
metadata,
- String
narExtractionDirectory)
+ protected EntryFilter load(EntryFilterMetaData metadata)
throws IOException {
- final File narFile =
metadata.getArchivePath().toAbsolutePath().toFile();
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(narFile)
- .parentClassLoader(EntryFilter.class.getClassLoader())
- .extractionDirectory(narExtractionDirectory)
- .build();
- EntryFilterDefinition def = getEntryFilterDefinition(ncl);
+ final EntryFilterDefinition def = metadata.getDefinition();
if (StringUtils.isBlank(def.getEntryFilterClass())) {
- throw new IOException("Entry filters `" + def.getName() + "` does
NOT provide a entry"
+ throw new RuntimeException("Entry filter `" + def.getName() + "`
does NOT provide a entry"
+ " filters implementation");
}
-
try {
+ final NarClassLoader ncl =
getNarClassLoader(metadata.getArchivePath());
+ if (ncl == null) {
+ throw new RuntimeException("Entry filter `" + def.getName() +
"` cannot be loaded, "
+ + "see the broker logs for further details");
+ }
Class entryFilterClass = ncl.loadClass(def.getEntryFilterClass());
Object filter =
entryFilterClass.getDeclaredConstructor().newInstance();
if (!(filter instanceof EntryFilter)) {
@@ -177,12 +180,55 @@ public class EntryFilterProvider {
}
EntryFilter pi = (EntryFilter) filter;
return new EntryFilterWithClassLoader(pi, ncl);
- } catch (Exception e) {
+ } catch (Throwable e) {
if (e instanceof IOException) {
throw (IOException) e;
}
- log.error("Failed to load class {}", def.getEntryFilterClass(), e);
+ log.error("Failed to load class {}",
metadata.getDefinition().getEntryFilterClass(), e);
throw new IOException(e);
}
}
+
+ private NarClassLoader getNarClassLoader(Path archivePath) {
+ return cachedClassLoaders.get(classLoaderKey(archivePath));
+ }
+
+ private NarClassLoader loadNarClassLoader(Path archivePath) {
+ final String absolutePath = classLoaderKey(archivePath);
+ return cachedClassLoaders
+ .computeIfAbsent(absolutePath, narFilePath -> {
+ try {
+ final File narFile =
archivePath.toAbsolutePath().toFile();
+ return NarClassLoaderBuilder.builder()
+ .narFile(narFile)
+
.parentClassLoader(EntryFilter.class.getClassLoader())
+
.extractionDirectory(serviceConfiguration.getNarExtractionDirectory())
+ .build();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ private static String classLoaderKey(Path archivePath) {
+ return archivePath.toString();
+ }
+
+ @Override
+ public void close() throws Exception {
+ brokerEntryFilters.forEach((filter) -> {
+ try {
+ filter.close();
+ } catch (Throwable e) {
+ log.warn("Error shutting down entry filter {}", filter, e);
+ }
+ });
+ cachedClassLoaders.forEach((name, ncl) -> {
+ try {
+ ncl.close();
+ } catch (Throwable e) {
+ log.warn("Error closing entry filter class loader {}", name,
e);
+ }
+ });
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
index 29a5dea119b..c5c57210877 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
@@ -18,12 +18,15 @@
*/
package org.apache.pulsar.broker.service.plugin;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.common.nar.NarClassLoader;
@Slf4j
+@ToString
public class EntryFilterWithClassLoader implements EntryFilter {
private final EntryFilter entryFilter;
private final NarClassLoader classLoader;
@@ -38,6 +41,11 @@ public class EntryFilterWithClassLoader implements
EntryFilter {
return entryFilter.filterEntry(entry, context);
}
+ @VisibleForTesting
+ public EntryFilter getEntryFilter() {
+ return entryFilter;
+ }
+
@Override
public void close() {
entryFilter.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 95b91fde1e1..c8ea5818d03 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -56,6 +56,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -66,6 +67,13 @@ import
org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
+import org.apache.pulsar.broker.service.plugin.EntryFilter2Test;
+import org.apache.pulsar.broker.service.plugin.EntryFilterDefinition;
+import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.broker.service.plugin.EntryFilterTest;
+import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
+import org.apache.pulsar.broker.testcontext.MockEntryFilterProvider;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.Mode;
@@ -2236,35 +2244,244 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 30000)
public void testSetNamespaceEntryFilters() throws Exception {
- EntryFilters entryFilters = new EntryFilters(
- "org.apache.pulsar.broker.service.plugin.EntryFilterTest");
+ final MockEntryFilterProvider testEntryFilterProvider =
+ new MockEntryFilterProvider(conf);
+
+ testEntryFilterProvider
+ .setMockEntryFilters(new EntryFilterDefinition(
+ "test",
+ null,
+ EntryFilterTest.class.getName()
+ ));
+ final EntryFilterProvider oldEntryFilterProvider =
pulsar.getBrokerService().getEntryFilterProvider();
+ FieldUtils.writeField(pulsar.getBrokerService(),
+ "entryFilterProvider", testEntryFilterProvider, true);
- final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
- admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
-
- assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
-
- admin.namespaces().setNamespaceEntryFilters(myNamespace, entryFilters);
- assertEquals(admin.namespaces().getNamespaceEntryFilters(myNamespace),
entryFilters);
- admin.namespaces().removeNamespaceEntryFilters(myNamespace);
- assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
+ try {
+ EntryFilters entryFilters = new EntryFilters("test");
+
+ final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+ final String topicName = myNamespace + "/topic";
+ admin.topics().createNonPartitionedTopic(topicName);
+
+
assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
+ assertEquals(pulsar
+ .getBrokerService()
+ .getTopic(topicName, false)
+ .get()
+ .get()
+ .getEntryFilters()
+ .size(), 0);
+
+ admin.namespaces().setNamespaceEntryFilters(myNamespace,
entryFilters);
+
assertEquals(admin.namespaces().getNamespaceEntryFilters(myNamespace),
entryFilters);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(pulsar
+ .getBrokerService()
+ .getTopic(topicName, false)
+ .get()
+ .get()
+ .getEntryFiltersPolicy()
+ .getEntryFilterNames(), "test");
+ });
+
+ assertEquals(pulsar
+ .getBrokerService()
+ .getTopic(topicName, false)
+ .get()
+ .get()
+ .getEntryFilters()
+ .size(), 1);
+ admin.namespaces().removeNamespaceEntryFilters(myNamespace);
+
assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(pulsar
+ .getBrokerService()
+ .getTopic(topicName, false)
+ .get()
+ .get()
+ .getEntryFiltersPolicy()
+ .getEntryFilterNames(), "");
+ });
+
+ assertEquals(pulsar
+ .getBrokerService()
+ .getTopic(topicName, false)
+ .get()
+ .get()
+ .getEntryFilters()
+ .size(), 0);
+ } finally {
+ FieldUtils.writeField(pulsar.getBrokerService(),
+ "entryFilterProvider", oldEntryFilterProvider, true);
+ }
}
@Test(dataProvider = "topicType")
public void testSetTopicLevelEntryFilters(String topicType) throws
Exception {
- EntryFilters entryFilters = new
EntryFilters("org.apache.pulsar.broker.service.plugin.EntryFilterTest");
- final String topic = topicType +
"://prop-xyz/ns1/test-schema-validation-enforced";
- admin.topics().createPartitionedTopic(topic, 1);
- @Cleanup
- Producer<byte[]> producer1 = pulsarClient.newProducer()
- .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0)
- .create();
- assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic,
false));
- admin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilters);
- Awaitility.await().untilAsserted(() ->
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
- false), entryFilters));
- admin.topicPolicies().removeEntryFiltersPerTopic(topic);
- assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic,
false));
+ final MockEntryFilterProvider testEntryFilterProvider =
+ new MockEntryFilterProvider(conf);
+
+ testEntryFilterProvider
+ .setMockEntryFilters(new EntryFilterDefinition(
+ "test",
+ null,
+ EntryFilterTest.class.getName()
+ ));
+ final EntryFilterProvider oldEntryFilterProvider =
pulsar.getBrokerService().getEntryFilterProvider();
+ FieldUtils.writeField(pulsar.getBrokerService(),
+ "entryFilterProvider", testEntryFilterProvider, true);
+ try {
+ EntryFilters entryFilters = new EntryFilters("test");
+ final String topic = topicType +
"://prop-xyz/ns1/test-schema-validation-enforced";
+ admin.topics().createPartitionedTopic(topic, 1);
+ final String fullTopicName = topic +
TopicName.PARTITIONED_TOPIC_SUFFIX + 0;
+ @Cleanup
+ Producer<byte[]> producer1 = pulsarClient.newProducer()
+ .topic(fullTopicName)
+ .create();
+ assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic,
false));
+ assertEquals(pulsar
+ .getBrokerService()
+ .getTopic(fullTopicName, false)
+ .get()
+ .get()
+ .getEntryFilters()
+ .size(), 0);
+ admin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilters);
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
+ false), entryFilters));
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(pulsar
+ .getBrokerService()
+ .getTopic(fullTopicName, false)
+ .get()
+ .get()
+ .getEntryFiltersPolicy()
+ .getEntryFilterNames(), "test");
+ });
+ assertEquals(pulsar
+ .getBrokerService()
+ .getTopic(fullTopicName, false)
+ .get()
+ .get()
+ .getEntryFilters()
+ .size(), 1);
+ admin.topicPolicies().removeEntryFiltersPerTopic(topic);
+ assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic,
false));
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(pulsar
+ .getBrokerService()
+ .getTopic(fullTopicName, false)
+ .get()
+ .get()
+ .getEntryFiltersPolicy()
+ .getEntryFilterNames(), "");
+ });
+ assertEquals(pulsar
+ .getBrokerService()
+ .getTopic(fullTopicName, false)
+ .get()
+ .get()
+ .getEntryFilters()
+ .size(), 0);
+ } finally {
+ FieldUtils.writeField(pulsar.getBrokerService(),
+ "entryFilterProvider", oldEntryFilterProvider, true);
+ }
+ }
+
+ @Test(timeOut = 30000)
+ public void testSetEntryFiltersHierarchy() throws Exception {
+ final MockEntryFilterProvider testEntryFilterProvider =
+ new MockEntryFilterProvider(conf);
+ conf.setEntryFilterNames(List.of("test", "test1"));
+
+ testEntryFilterProvider.setMockEntryFilters(new EntryFilterDefinition(
+ "test",
+ null,
+ EntryFilterTest.class.getName()
+ ), new EntryFilterDefinition(
+ "test1",
+ null,
+ EntryFilter2Test.class.getName()
+ ));
+ final EntryFilterProvider oldEntryFilterProvider =
pulsar.getBrokerService().getEntryFilterProvider();
+ FieldUtils.writeField(pulsar.getBrokerService(),
+ "entryFilterProvider", testEntryFilterProvider, true);
+ try {
+
+ final String topic =
"persistent://prop-xyz/ns1/test-schema-validation-enforced";
+ admin.topics().createPartitionedTopic(topic, 1);
+ final String fullTopicName = topic +
TopicName.PARTITIONED_TOPIC_SUFFIX + 0;
+ @Cleanup
+ Producer<byte[]> producer1 = pulsarClient.newProducer()
+ .topic(fullTopicName)
+ .create();
+ assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic,
false));
+ assertEquals(pulsar
+ .getBrokerService()
+ .getTopic(fullTopicName, false)
+ .get()
+ .get()
+ .getEntryFilters()
+ .size(), 2);
+
+ EntryFilters nsEntryFilters = new EntryFilters("test");
+ admin.namespaces().setNamespaceEntryFilters("prop-xyz/ns1",
nsEntryFilters);
+
assertEquals(admin.namespaces().getNamespaceEntryFilters("prop-xyz/ns1"),
nsEntryFilters);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(pulsar
+ .getBrokerService()
+ .getTopic(fullTopicName, false)
+ .get()
+ .get()
+ .getEntryFiltersPolicy()
+ .getEntryFilterNames(), "test");
+ });
+
+ Awaitility.await().untilAsserted(() -> {
+ final List<EntryFilter> entryFilters = pulsar
+ .getBrokerService()
+ .getTopic(fullTopicName, false)
+ .get()
+ .get()
+ .getEntryFilters();
+ assertEquals(entryFilters.size(), 1);
+ assertEquals(((EntryFilterWithClassLoader)entryFilters.get(0))
+ .getEntryFilter().getClass(), EntryFilterTest.class);
+
+ });
+
+
+ EntryFilters topicEntryFilters = new EntryFilters("test1");
+ admin.topicPolicies().setEntryFiltersPerTopic(topic,
topicEntryFilters);
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
+ false), topicEntryFilters));
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(pulsar
+ .getBrokerService()
+ .getTopic(fullTopicName, false)
+ .get()
+ .get()
+ .getEntryFiltersPolicy()
+ .getEntryFilterNames(), "test1");
+ });
+ final List<EntryFilter> entryFilters = pulsar
+ .getBrokerService()
+ .getTopic(fullTopicName, false)
+ .get()
+ .get()
+ .getEntryFilters();
+ assertEquals(entryFilters.size(), 1);
+ assertEquals(((EntryFilterWithClassLoader) entryFilters.get(0))
+ .getEntryFilter().getClass(), EntryFilter2Test.class);
+
+ } finally {
+ FieldUtils.writeField(pulsar.getBrokerService(),
+ "entryFilterProvider", oldEntryFilterProvider, true);
+ }
}
@Test(timeOut = 30000)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index 554ef1c3f96..cc2ec3444d5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -29,19 +29,19 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
-import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
+import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -87,13 +87,19 @@ public class AbstractBaseDispatcherTest {
Topic mockTopic = mock(Topic.class);
when(this.subscriptionMock.getTopic()).thenReturn(mockTopic);
+ final EntryFilterProvider entryFilterProvider =
mock(EntryFilterProvider.class);
+ final ServiceConfiguration serviceConfiguration =
mock(ServiceConfiguration.class);
+
when(serviceConfiguration.isAllowOverrideEntryFilters()).thenReturn(true);
+ final PulsarService pulsar = mock(PulsarService.class);
+ when(pulsar.getConfiguration()).thenReturn(serviceConfiguration);
BrokerService mockBrokerService = mock(BrokerService.class);
+ when(mockBrokerService.pulsar()).thenReturn(pulsar);
+
when(mockBrokerService.getEntryFilterProvider()).thenReturn(entryFilterProvider);
when(mockTopic.getBrokerService()).thenReturn(mockBrokerService);
- EntryFilterWithClassLoader mockFilter =
mock(EntryFilterWithClassLoader.class);
+ EntryFilter mockFilter = mock(EntryFilter.class);
when(mockFilter.filterEntry(any(Entry.class),
any(FilterContext.class))).thenReturn(
EntryFilter.FilterResult.REJECT);
- Map<String, EntryFilterWithClassLoader> entryFilters = Map.of("key",
mockFilter);
- when(mockTopic.getEntryFilters()).thenReturn(entryFilters);
+ when(mockTopic.getEntryFilters()).thenReturn(List.of(mockFilter));
DispatchRateLimiter subscriptionDispatchRateLimiter =
mock(DispatchRateLimiter.class);
this.helper = new
AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index d5131e5df7b..4b9d91fbde2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -23,12 +23,14 @@ import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
import static
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
+
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
@@ -37,9 +39,14 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+
+import lombok.Cleanup;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Dispatcher;
@@ -59,6 +66,7 @@ import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
@@ -92,22 +100,15 @@ public class FilterEntryTest extends BrokerTestBase {
.getTopicReference(topic).get();
// set topic level entry filters
- EntryFilterWithClassLoader mockFilter =
mock(EntryFilterWithClassLoader.class);
+ EntryFilter mockFilter = mock(EntryFilter.class);
when(mockFilter.filterEntry(any(Entry.class),
any(FilterContext.class))).thenReturn(
EntryFilter.FilterResult.REJECT);
- Map<String, EntryFilterWithClassLoader> entryFilters = Map.of("key",
mockFilter);
-
- Field field =
topicRef.getClass().getSuperclass().getDeclaredField("entryFilters");
- field.setAccessible(true);
- field.set(topicRef, entryFilters);
+ setMockFilterToTopic(topicRef, List.of(mockFilter));
- EntryFilterWithClassLoader mockFilter1 =
mock(EntryFilterWithClassLoader.class);
+ EntryFilter mockFilter1 = mock(EntryFilter.class);
when(mockFilter1.filterEntry(any(Entry.class),
any(FilterContext.class))).thenReturn(
EntryFilter.FilterResult.ACCEPT);
- Map<String, EntryFilterWithClassLoader> entryFilters1 = Map.of("key2",
mockFilter1);
- Field field2 =
pulsar.getBrokerService().getClass().getDeclaredField("entryFilters");
- field2.setAccessible(true);
- field2.set(pulsar.getBrokerService(), entryFilters1);
+ setMockBrokerFilter(List.of(mockFilter1));
Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topic)
.subscriptionInitialPosition(Earliest)
@@ -148,11 +149,22 @@ public class FilterEntryTest extends BrokerTestBase {
consumer.close();
}
+ @SneakyThrows
+ private void setMockFilterToTopic(PersistentTopic topicRef,
List<EntryFilter> mockFilter) {
+ FieldUtils.writeField(topicRef, "entryFilters", Pair.of(null,
mockFilter), true);
+ }
+
+ @SneakyThrows
+ private void setMockBrokerFilter(List<EntryFilter> mockFilter) {
+
FieldUtils.writeField(pulsar.getBrokerService().getEntryFilterProvider(),
+ "brokerEntryFilters", mockFilter, true);
+ }
+
@Test
public void testFilter() throws Exception {
Map<String, String> map = new HashMap<>();
- map.put("1","1");
- map.put("2","2");
+ map.put("1", "1");
+ map.put("2", "2");
String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
String subName = "sub";
Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topic)
@@ -266,9 +278,7 @@ public class FilterEntryTest extends BrokerTestBase {
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService()
.getTopicReference(topic).get();
- Field field1 =
topicRef.getClass().getSuperclass().getDeclaredField("entryFilters");
- field1.setAccessible(true);
- field1.set(topicRef, Map.of("1", loader1, "2", loader2));
+ setMockFilterToTopic(topicRef, List.of(loader1, loader2));
cleanup();
verify(loader1, times(1)).close();
@@ -471,7 +481,7 @@ public class FilterEntryTest extends BrokerTestBase {
int numEntriesAccepted, int numMessagesAccepted,
int numEntriesRejected, int numMessagesRejected,
int numEntriesRescheduled, int
numMessagesRescheduled
- ) throws Exception {
+ ) throws Exception {
AnalyzeSubscriptionBacklogResult a1
= admin.topics().analyzeSubscriptionBacklog(topic,
subscription, Optional.empty());
@@ -485,4 +495,93 @@ public class FilterEntryTest extends BrokerTestBase {
Assert.assertEquals(numMessagesRejected,
a1.getFilterRejectedMessages());
Assert.assertEquals(numMessagesRescheduled,
a1.getFilterRescheduledMessages());
}
+
+
+ @DataProvider(name = "overrideBrokerEntryFilters")
+ public static Object[][] overrideBrokerEntryFilters() {
+ return new Object[][]{ {true}, {false} };
+ }
+
+
+ @Test(dataProvider = "overrideBrokerEntryFilters")
+ public void testExecuteInOrder(boolean overrideBrokerEntryFilters) throws
Exception {
+ conf.setAllowOverrideEntryFilters(true);
+ String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+ String subName = "sub";
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topic).create();
+ for (int i = 0; i < 10; i++) {
+ producer.send("test");
+ }
+
+ EntryFilter mockFilterReject = mock(EntryFilter.class);
+ when(mockFilterReject.filterEntry(any(Entry.class),
any(FilterContext.class))).thenReturn(
+ EntryFilter.FilterResult.REJECT);
+ EntryFilter mockFilterAccept = mock(EntryFilter.class);
+ when(mockFilterAccept.filterEntry(any(Entry.class),
any(FilterContext.class))).thenReturn(
+ EntryFilter.FilterResult.ACCEPT);
+ if (overrideBrokerEntryFilters) {
+ setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService()
+ .getTopicReference(topic).get(), List.of(mockFilterReject,
mockFilterAccept));
+ } else {
+ setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService()
+ .getTopicReference(topic).get(), List.of());
+ setMockBrokerFilter(List.of(mockFilterReject, mockFilterAccept));
+ }
+
+
+ Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+ .subscriptionInitialPosition(Earliest)
+ .subscriptionName(subName).subscribe();
+
+ int counter = 0;
+ while (true) {
+ Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+ if (message != null) {
+ counter++;
+ consumer.acknowledge(message);
+ } else {
+ break;
+ }
+ }
+ // All normal messages can be received
+ assertEquals(0, counter);
+ consumer.close();
+ verify(mockFilterReject, times(10))
+ .filterEntry(any(Entry.class), any(FilterContext.class));
+ verify(mockFilterAccept, never())
+ .filterEntry(any(Entry.class), any(FilterContext.class));
+
+ if (overrideBrokerEntryFilters) {
+ setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService()
+ .getTopicReference(topic).get(), List.of(mockFilterAccept,
mockFilterReject));
+ } else {
+ setMockFilterToTopic((PersistentTopic) pulsar.getBrokerService()
+ .getTopicReference(topic).get(), List.of());
+ setMockBrokerFilter(List.of(mockFilterAccept, mockFilterReject));
+ }
+
+ @Cleanup
+ Consumer<String> consumer2 =
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+ .subscriptionInitialPosition(Earliest)
+ .subscriptionName(subName + "-2").subscribe();
+
+ counter = 0;
+ while (true) {
+ Message<String> message = consumer2.receive(1, TimeUnit.SECONDS);
+ if (message != null) {
+ counter++;
+ consumer2.acknowledge(message);
+ } else {
+ break;
+ }
+ }
+ assertEquals(0, counter);
+ verify(mockFilterReject, times(20))
+ .filterEntry(any(Entry.class), any(FilterContext.class));
+ verify(mockFilterAccept, times(10))
+ .filterEntry(any(Entry.class), any(FilterContext.class));
+
+
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 13f1b3cc8e2..bbeee9f5a49 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -42,6 +42,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
@@ -374,6 +375,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase
{
@Test
public void testAvgMessagesPerEntry() throws Exception {
+ conf.setAllowOverrideEntryFilters(true);
final String topic = "persistent://public/default/testFilterState";
String subName = "sub";
@@ -406,7 +408,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase
{
EntryFilterWithClassLoader
loader =
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter,
narClassLoader);
- Map<String, EntryFilterWithClassLoader> entryFilters =
Map.of("filter", loader);
+ Pair<String, List<EntryFilter>> entryFilters = Pair.of("filter",
List.of(loader));
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService()
.getTopicReference(topic).get();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockEntryFilterProvider.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockEntryFilterProvider.java
new file mode 100644
index 00000000000..425f4ee41a7
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockEntryFilterProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import lombok.SneakyThrows;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.plugin.EntryFilterDefinition;
+import org.apache.pulsar.broker.service.plugin.EntryFilterMetaData;
+import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MockEntryFilterProvider extends EntryFilterProvider {
+
+ public MockEntryFilterProvider(ServiceConfiguration config) throws
IOException {
+ super(config);
+ }
+
+ @SneakyThrows
+ public void setMockEntryFilters(EntryFilterDefinition... defs) {
+ definitions = new HashMap<>();
+ cachedClassLoaders = new HashMap<>();
+ brokerEntryFilters = new ArrayList<>();
+
+ for (EntryFilterDefinition def : defs) {
+ final String name = def.getName();
+ final EntryFilterMetaData meta = new EntryFilterMetaData();
+ meta.setDefinition(def);
+ meta.setArchivePath(Path.of(name));
+ definitions.put(name, meta);
+ final NarClassLoader ncl = mock(NarClassLoader.class);
+
+ when(ncl.loadClass(anyString())).thenAnswer(a -> {
+ final Object argument = a.getArguments()[0];
+ return
Thread.currentThread().getContextClassLoader().loadClass(argument.toString());
+ });
+ cachedClassLoaders.put(Path.of(name).toString(), ncl);
+ }
+ initializeBrokerEntryFilters();
+ }
+
+}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java
index 5192e9bad3a..5ebe793d14b 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java
@@ -28,7 +28,7 @@ import lombok.NoArgsConstructor;
public class EntryFilters {
/**
- * The class name for the entry filter.
+ * Entry filters class names separated by a comma.
*/
private String entryFilterNames;