This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 75d58fda2fe [fix][broker] Validate per-[namespace][topic] entry
filters (#19422)
75d58fda2fe is described below
commit 75d58fda2fe99471abddc60f7ed671aaa6073e66
Author: Nicolò Boschi <[email protected]>
AuthorDate: Fri Feb 3 20:41:06 2023 +0100
[fix][broker] Validate per-[namespace][topic] entry filters (#19422)
---
.../apache/pulsar/broker/admin/AdminResource.java | 24 ++++
.../pulsar/broker/admin/impl/NamespacesBase.java | 2 +
.../broker/admin/impl/PersistentTopicsBase.java | 53 +++++----
.../apache/pulsar/broker/admin/v2/Namespaces.java | 7 +-
.../broker/service/plugin/EntryFilterProvider.java | 24 +++-
.../plugin/InvalidEntryFilterException.java | 30 +++++
.../apache/pulsar/broker/admin/AdminApi2Test.java | 123 +++++++++++++++++++++
7 files changed, 238 insertions(+), 25 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index dd3ea8535b1..4190b4c486a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -39,7 +40,9 @@ import javax.ws.rs.core.Response.Status;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
@@ -52,6 +55,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
@@ -786,6 +790,26 @@ public abstract class AdminResource extends
PulsarWebResource {
}
+ protected void validateEntryFilters(EntryFilters entryFilters) {
+ if (entryFilters == null) {
+ // remove entry filters
+ return;
+ }
+ if (StringUtils.isBlank(entryFilters.getEntryFilterNames())
+ || Arrays.stream(entryFilters.getEntryFilterNames().split(","))
+ .filter(n -> StringUtils.isNotBlank(n))
+ .findAny().isEmpty()) {
+ throw new RestException(new RestException(Status.BAD_REQUEST,
+ "entryFilterNames can't be empty. To remove entry filters
use the remove method."));
+ }
+ try {
+ pulsar().getBrokerService().getEntryFilterProvider()
+ .validateEntryFilters(entryFilters.getEntryFilterNames());
+ } catch (InvalidEntryFilterException ex) {
+ throw new RestException(new RestException(Status.BAD_REQUEST, ex));
+ }
+ }
+
/**
* Check current exception whether is redirect exception.
*
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 9b93752d5e4..d5cf6a3e74d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2479,12 +2479,14 @@ public abstract class NamespacesBase extends
AdminResource {
protected CompletableFuture<Void>
internalSetEntryFiltersPerTopicAsync(EntryFilters entryFilters) {
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenAccept(__ -> validateEntryFilters(entryFilters))
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
policies.entryFilters = entryFilters;
return policies;
}));
}
+
/**
* Base method for setReplicatorDispatchRate v1 and v2.
* Notion: don't re-use this logic.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index efe2c31918a..c5d465e747e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -5378,34 +5378,47 @@ public class PersistentTopicsBase extends AdminResource
{
protected CompletableFuture<EntryFilters> internalGetEntryFilters(boolean
applied, boolean isGlobal) {
return validateTopicPolicyOperationAsync(topicName,
PolicyName.ENTRY_FILTERS, PolicyOperation.READ)
- .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName,
isGlobal)
- .thenApply(op -> op.map(TopicPolicies::getEntryFilters)
- .orElseGet(() -> {
- if (applied) {
- EntryFilters entryFilters =
getNamespacePolicies(namespaceName).entryFilters;
- if (entryFilters == null) {
- return new EntryFilters(String.join(",",
-
pulsar().getConfiguration().getEntryFilterNames()));
+ .thenCompose(__ -> {
+ if (!applied) {
+ return getTopicPoliciesAsyncWithRetry(topicName,
isGlobal)
+ .thenApply(op ->
op.map(TopicPolicies::getEntryFilters).orElse(null));
+ }
+ if
(!pulsar().getConfiguration().isAllowOverrideEntryFilters()) {
+ return CompletableFuture.completedFuture(new
EntryFilters(String.join(",",
+
pulsar().getConfiguration().getEntryFilterNames())));
+ }
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenApply(op ->
op.map(TopicPolicies::getEntryFilters))
+ .thenCompose(policyEntryFilters -> {
+ if (policyEntryFilters.isPresent()) {
+ return
CompletableFuture.completedFuture(policyEntryFilters.get());
}
- return entryFilters;
- }
- return null;
- })));
-
+ return getNamespacePoliciesAsync(namespaceName)
+ .thenApply(policies ->
policies.entryFilters)
+ .thenCompose(nsEntryFilters -> {
+ if (nsEntryFilters != null) {
+ return
CompletableFuture.completedFuture(nsEntryFilters);
+ }
+ return
CompletableFuture.completedFuture(new EntryFilters(String.join(",",
+
pulsar().getConfiguration().getEntryFilterNames())));
+ });
+ });
+ });
}
protected CompletableFuture<Void> internalSetEntryFilters(EntryFilters
entryFilters,
boolean
isGlobal) {
return validateTopicPolicyOperationAsync(topicName,
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
+ .thenAccept(__ -> validateEntryFilters(entryFilters))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName,
isGlobal)
- .thenCompose(op -> {
- TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
- topicPolicies.setEntryFilters(entryFilters);
- topicPolicies.setIsGlobal(isGlobal);
- return pulsar().getTopicPoliciesService()
- .updateTopicPoliciesAsync(topicName,
topicPolicies);
- }));
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
+ topicPolicies.setEntryFilters(entryFilters);
+ topicPolicies.setIsGlobal(isGlobal);
+ return pulsar().getTopicPoliciesService()
+ .updateTopicPoliciesAsync(topicName,
topicPolicies);
+ }));
}
protected CompletableFuture<Void> internalRemoveEntryFilters(boolean
isGlobal) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index efaf038d632..80af5f4ad45 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -2771,8 +2771,11 @@ public class Namespaces extends NamespacesBase {
@POST
@Path("/{tenant}/{namespace}/entryFilters")
@ApiOperation(value = "Set entry filters for namespace")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
- @ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist")})
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Specified entry filters are
not valid"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist")
+ })
public void setEntryFiltersPerTopic(@Suspended AsyncResponse
asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String
namespace,
@ApiParam(value = "entry filters",
required = true)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
index db643f43fa8..f93e561542e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
@@ -70,15 +70,33 @@ public class EntryFilterProvider implements AutoCloseable {
}
}
+ public void validateEntryFilters(String entryFilterNames) throws
InvalidEntryFilterException {
+ if (StringUtils.isBlank(entryFilterNames)) {
+ return;
+ }
+ final List<String> entryFilterList =
readEntryFiltersString(entryFilterNames);
+ for (String filterName : entryFilterList) {
+ EntryFilterMetaData metaData = definitions.get(filterName);
+ if (metaData == null) {
+ throw new InvalidEntryFilterException("Entry filter '" +
filterName + "' not found");
+ }
+ }
+ }
+
+ private List<String> readEntryFiltersString(String entryFilterNames) {
+ final List<String> entryFilterList =
Arrays.stream(entryFilterNames.split(","))
+ .filter(n -> StringUtils.isNotBlank(n))
+ .toList();
+ return entryFilterList;
+ }
+
public List<EntryFilter> loadEntryFiltersForPolicy(EntryFilters policy)
throws IOException {
final String names = policy.getEntryFilterNames();
if (StringUtils.isBlank(names)) {
return Collections.emptyList();
}
- final List<String> entryFilterList = Arrays.stream(names.split(","))
- .filter(n -> StringUtils.isNotBlank(n))
- .toList();
+ final List<String> entryFilterList = readEntryFiltersString(names);
return loadEntryFilters(entryFilterList);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/InvalidEntryFilterException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/InvalidEntryFilterException.java
new file mode 100644
index 00000000000..b37c78c65ae
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/InvalidEntryFilterException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+public class InvalidEntryFilterException extends Exception {
+
+ public InvalidEntryFilterException(String message) {
+ super(message);
+ }
+
+ public InvalidEntryFilterException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index c8ea5818d03..f7e7bcb4ea1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -2397,6 +2397,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
final MockEntryFilterProvider testEntryFilterProvider =
new MockEntryFilterProvider(conf);
conf.setEntryFilterNames(List.of("test", "test1"));
+ conf.setAllowOverrideEntryFilters(true);
testEntryFilterProvider.setMockEntryFilters(new EntryFilterDefinition(
"test",
@@ -2420,6 +2421,8 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
.topic(fullTopicName)
.create();
assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic,
false));
+ assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
true),
+ new EntryFilters("test,test1"));
assertEquals(pulsar
.getBrokerService()
.getTopic(fullTopicName, false)
@@ -2431,6 +2434,8 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
EntryFilters nsEntryFilters = new EntryFilters("test");
admin.namespaces().setNamespaceEntryFilters("prop-xyz/ns1",
nsEntryFilters);
assertEquals(admin.namespaces().getNamespaceEntryFilters("prop-xyz/ns1"),
nsEntryFilters);
+ assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
true),
+ new EntryFilters("test"));
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar
.getBrokerService()
@@ -2459,6 +2464,8 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
admin.topicPolicies().setEntryFiltersPerTopic(topic,
topicEntryFilters);
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
false), topicEntryFilters));
+ assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
true),
+ new EntryFilters("test1"));
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar
.getBrokerService()
@@ -2484,6 +2491,122 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
}
}
+ @Test(timeOut = 30000)
+ public void testValidateNamespaceEntryFilters() throws Exception {
+ final MockEntryFilterProvider testEntryFilterProvider =
+ new MockEntryFilterProvider(conf);
+
+ testEntryFilterProvider
+ .setMockEntryFilters(new EntryFilterDefinition(
+ "test",
+ null,
+ EntryFilterTest.class.getName()
+ ));
+ final EntryFilterProvider oldEntryFilterProvider =
pulsar.getBrokerService().getEntryFilterProvider();
+ FieldUtils.writeField(pulsar.getBrokerService(),
+ "entryFilterProvider", testEntryFilterProvider, true);
+
+ try {
+ final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+ try {
+ admin.namespaces().setNamespaceEntryFilters(myNamespace, new
EntryFilters("notexists"));
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 400);
+ assertEquals(e.getMessage(), "Entry filter 'notexists' not
found");
+ }
+ try {
+ admin.namespaces().setNamespaceEntryFilters(myNamespace, new
EntryFilters(""));
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 400);
+ assertEquals(e.getMessage(), "entryFilterNames can't be empty.
" +
+ "To remove entry filters use the remove method.");
+ }
+ try {
+ admin.namespaces().setNamespaceEntryFilters(myNamespace, new
EntryFilters(","));
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 400);
+ assertEquals(e.getMessage(), "entryFilterNames can't be empty.
" +
+ "To remove entry filters use the remove method.");
+ }
+ try {
+ admin.namespaces().setNamespaceEntryFilters(myNamespace, new
EntryFilters("test,notexists"));
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 400);
+ assertEquals(e.getMessage(), "Entry filter 'notexists' not
found");
+ }
+
assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
+ } finally {
+ FieldUtils.writeField(pulsar.getBrokerService(),
+ "entryFilterProvider", oldEntryFilterProvider, true);
+ }
+ }
+
+ @Test(timeOut = 30000)
+ public void testValidateTopicEntryFilters() throws Exception {
+ final MockEntryFilterProvider testEntryFilterProvider =
+ new MockEntryFilterProvider(conf);
+
+ testEntryFilterProvider
+ .setMockEntryFilters(new EntryFilterDefinition(
+ "test",
+ null,
+ EntryFilterTest.class.getName()
+ ));
+ final EntryFilterProvider oldEntryFilterProvider =
pulsar.getBrokerService().getEntryFilterProvider();
+ FieldUtils.writeField(pulsar.getBrokerService(),
+ "entryFilterProvider", testEntryFilterProvider, true);
+
+ try {
+ final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+ final String topicName = myNamespace + "/topic";
+ admin.topics().createNonPartitionedTopic(topicName);
+ @Cleanup
+ Producer<byte[]> producer1 = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+ try {
+ admin.topicPolicies().setEntryFiltersPerTopic(topicName, new
EntryFilters("notexists"));
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 400);
+ assertEquals(e.getMessage(), "Entry filter 'notexists' not
found");
+ }
+ try {
+ admin.topicPolicies().setEntryFiltersPerTopic(topicName, new
EntryFilters(""));
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 400);
+ assertEquals(e.getMessage(), "entryFilterNames can't be empty.
" +
+ "To remove entry filters use the remove method.");
+ }
+ try {
+ admin.topicPolicies().setEntryFiltersPerTopic(topicName, new
EntryFilters(","));
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 400);
+ assertEquals(e.getMessage(), "entryFilterNames can't be empty.
" +
+ "To remove entry filters use the remove method.");
+ }
+ try {
+ admin.topicPolicies().setEntryFiltersPerTopic(topicName, new
EntryFilters("test,notexists"));
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 400);
+ assertEquals(e.getMessage(), "Entry filter 'notexists' not
found");
+ }
+
assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topicName, false));
+ } finally {
+ FieldUtils.writeField(pulsar.getBrokerService(),
+ "entryFilterProvider", oldEntryFilterProvider, true);
+ }
+ }
+
@Test(timeOut = 30000)
public void testMaxSubPerTopic() throws Exception {
pulsar.getConfiguration().setMaxSubscriptionsPerTopic(0);