This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 75d58fda2fe [fix][broker] Validate per-[namespace][topic] entry 
filters (#19422)
75d58fda2fe is described below

commit 75d58fda2fe99471abddc60f7ed671aaa6073e66
Author: Nicolò Boschi <[email protected]>
AuthorDate: Fri Feb 3 20:41:06 2023 +0100

    [fix][broker] Validate per-[namespace][topic] entry filters (#19422)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  24 ++++
 .../pulsar/broker/admin/impl/NamespacesBase.java   |   2 +
 .../broker/admin/impl/PersistentTopicsBase.java    |  53 +++++----
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |   7 +-
 .../broker/service/plugin/EntryFilterProvider.java |  24 +++-
 .../plugin/InvalidEntryFilterException.java        |  30 +++++
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 123 +++++++++++++++++++++
 7 files changed, 238 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index dd3ea8535b1..4190b4c486a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.errorprone.annotations.CanIgnoreReturnValue;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -39,7 +40,9 @@ import javax.ws.rs.core.Response.Status;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
@@ -52,6 +55,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -786,6 +790,26 @@ public abstract class AdminResource extends 
PulsarWebResource {
 
     }
 
+    protected void validateEntryFilters(EntryFilters entryFilters) {
+        if (entryFilters == null) {
+            // remove entry filters
+            return;
+        }
+        if (StringUtils.isBlank(entryFilters.getEntryFilterNames())
+                || Arrays.stream(entryFilters.getEntryFilterNames().split(","))
+                        .filter(n -> StringUtils.isNotBlank(n))
+                        .findAny().isEmpty()) {
+            throw new RestException(new RestException(Status.BAD_REQUEST,
+                    "entryFilterNames can't be empty. To remove entry filters 
use the remove method."));
+        }
+        try {
+            pulsar().getBrokerService().getEntryFilterProvider()
+                    .validateEntryFilters(entryFilters.getEntryFilterNames());
+        } catch (InvalidEntryFilterException ex) {
+            throw new RestException(new RestException(Status.BAD_REQUEST, ex));
+        }
+    }
+
     /**
      * Check current exception whether is redirect exception.
      *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 9b93752d5e4..d5cf6a3e74d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2479,12 +2479,14 @@ public abstract class NamespacesBase extends 
AdminResource {
     protected CompletableFuture<Void> 
internalSetEntryFiltersPerTopicAsync(EntryFilters entryFilters) {
         return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenAccept(__ -> validateEntryFilters(entryFilters))
                 .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
                     policies.entryFilters = entryFilters;
                     return policies;
                 }));
     }
 
+
     /**
      * Base method for setReplicatorDispatchRate v1 and v2.
      * Notion: don't re-use this logic.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index efe2c31918a..c5d465e747e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -5378,34 +5378,47 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     protected CompletableFuture<EntryFilters> internalGetEntryFilters(boolean 
applied, boolean isGlobal) {
         return validateTopicPolicyOperationAsync(topicName, 
PolicyName.ENTRY_FILTERS, PolicyOperation.READ)
-                .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal)
-                .thenApply(op -> op.map(TopicPolicies::getEntryFilters)
-                        .orElseGet(() -> {
-                            if (applied) {
-                                EntryFilters entryFilters = 
getNamespacePolicies(namespaceName).entryFilters;
-                                if (entryFilters == null) {
-                                    return new EntryFilters(String.join(",",
-                                            
pulsar().getConfiguration().getEntryFilterNames()));
+                .thenCompose(__ -> {
+                    if (!applied) {
+                        return getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal)
+                                .thenApply(op -> 
op.map(TopicPolicies::getEntryFilters).orElse(null));
+                    }
+                    if 
(!pulsar().getConfiguration().isAllowOverrideEntryFilters()) {
+                        return CompletableFuture.completedFuture(new 
EntryFilters(String.join(",",
+                                
pulsar().getConfiguration().getEntryFilterNames())));
+                    }
+                    return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+                            .thenApply(op -> 
op.map(TopicPolicies::getEntryFilters))
+                            .thenCompose(policyEntryFilters -> {
+                                if (policyEntryFilters.isPresent()) {
+                                    return 
CompletableFuture.completedFuture(policyEntryFilters.get());
                                 }
-                                return entryFilters;
-                            }
-                            return null;
-                        })));
-
+                                return getNamespacePoliciesAsync(namespaceName)
+                                        .thenApply(policies -> 
policies.entryFilters)
+                                        .thenCompose(nsEntryFilters -> {
+                                            if (nsEntryFilters != null) {
+                                                return 
CompletableFuture.completedFuture(nsEntryFilters);
+                                            }
+                                            return 
CompletableFuture.completedFuture(new EntryFilters(String.join(",",
+                                                    
pulsar().getConfiguration().getEntryFilterNames())));
+                                        });
+                            });
+                });
     }
 
     protected CompletableFuture<Void> internalSetEntryFilters(EntryFilters 
entryFilters,
                                                               boolean 
isGlobal) {
 
         return validateTopicPolicyOperationAsync(topicName, 
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
+                .thenAccept(__ -> validateEntryFilters(entryFilters))
                 .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal)
-                        .thenCompose(op -> {
-                            TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
-                            topicPolicies.setEntryFilters(entryFilters);
-                            topicPolicies.setIsGlobal(isGlobal);
-                            return pulsar().getTopicPoliciesService()
-                                    .updateTopicPoliciesAsync(topicName, 
topicPolicies);
-                        }));
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setEntryFilters(entryFilters);
+                    topicPolicies.setIsGlobal(isGlobal);
+                    return pulsar().getTopicPoliciesService()
+                            .updateTopicPoliciesAsync(topicName, 
topicPolicies);
+                }));
     }
 
     protected CompletableFuture<Void> internalRemoveEntryFilters(boolean 
isGlobal) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index efaf038d632..80af5f4ad45 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -2771,8 +2771,11 @@ public class Namespaces extends NamespacesBase {
     @POST
     @Path("/{tenant}/{namespace}/entryFilters")
     @ApiOperation(value = "Set entry filters for namespace")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist")})
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Specified entry filters are 
not valid"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist")
+    })
     public void setEntryFiltersPerTopic(@Suspended AsyncResponse 
asyncResponse, @PathParam("tenant") String tenant,
                                        @PathParam("namespace") String 
namespace,
                                        @ApiParam(value = "entry filters", 
required = true)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
index db643f43fa8..f93e561542e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
@@ -70,15 +70,33 @@ public class EntryFilterProvider implements AutoCloseable {
         }
     }
 
+    public void validateEntryFilters(String entryFilterNames) throws 
InvalidEntryFilterException {
+        if (StringUtils.isBlank(entryFilterNames)) {
+            return;
+        }
+        final List<String> entryFilterList = 
readEntryFiltersString(entryFilterNames);
+        for (String filterName : entryFilterList) {
+            EntryFilterMetaData metaData = definitions.get(filterName);
+            if (metaData == null) {
+                throw new InvalidEntryFilterException("Entry filter '" + 
filterName + "' not found");
+            }
+        }
+    }
+
+    private List<String> readEntryFiltersString(String entryFilterNames) {
+        final List<String> entryFilterList = 
Arrays.stream(entryFilterNames.split(","))
+                .filter(n -> StringUtils.isNotBlank(n))
+                .toList();
+        return entryFilterList;
+    }
+
     public List<EntryFilter> loadEntryFiltersForPolicy(EntryFilters policy)
             throws IOException {
         final String names = policy.getEntryFilterNames();
         if (StringUtils.isBlank(names)) {
             return Collections.emptyList();
         }
-        final List<String> entryFilterList = Arrays.stream(names.split(","))
-                .filter(n -> StringUtils.isNotBlank(n))
-                .toList();
+        final List<String> entryFilterList = readEntryFiltersString(names);
         return loadEntryFilters(entryFilterList);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/InvalidEntryFilterException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/InvalidEntryFilterException.java
new file mode 100644
index 00000000000..b37c78c65ae
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/InvalidEntryFilterException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+public class InvalidEntryFilterException extends Exception {
+
+    public InvalidEntryFilterException(String message) {
+        super(message);
+    }
+
+    public InvalidEntryFilterException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index c8ea5818d03..f7e7bcb4ea1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -2397,6 +2397,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         final MockEntryFilterProvider testEntryFilterProvider =
                 new MockEntryFilterProvider(conf);
         conf.setEntryFilterNames(List.of("test", "test1"));
+        conf.setAllowOverrideEntryFilters(true);
 
         testEntryFilterProvider.setMockEntryFilters(new EntryFilterDefinition(
                         "test",
@@ -2420,6 +2421,8 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
                     .topic(fullTopicName)
                     .create();
             assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, 
false));
+            assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, 
true),
+                    new EntryFilters("test,test1"));
             assertEquals(pulsar
                     .getBrokerService()
                     .getTopic(fullTopicName, false)
@@ -2431,6 +2434,8 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
             EntryFilters nsEntryFilters = new EntryFilters("test");
             admin.namespaces().setNamespaceEntryFilters("prop-xyz/ns1", 
nsEntryFilters);
             
assertEquals(admin.namespaces().getNamespaceEntryFilters("prop-xyz/ns1"), 
nsEntryFilters);
+            assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, 
true),
+                    new EntryFilters("test"));
             Awaitility.await().untilAsserted(() -> {
                 assertEquals(pulsar
                         .getBrokerService()
@@ -2459,6 +2464,8 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
             admin.topicPolicies().setEntryFiltersPerTopic(topic, 
topicEntryFilters);
             Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
                     false), topicEntryFilters));
+            assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, 
true),
+                    new EntryFilters("test1"));
             Awaitility.await().untilAsserted(() -> {
                 assertEquals(pulsar
                         .getBrokerService()
@@ -2484,6 +2491,122 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test(timeOut = 30000)
+    public void testValidateNamespaceEntryFilters() throws Exception {
+        final MockEntryFilterProvider testEntryFilterProvider =
+                new MockEntryFilterProvider(conf);
+
+        testEntryFilterProvider
+                .setMockEntryFilters(new EntryFilterDefinition(
+                        "test",
+                        null,
+                        EntryFilterTest.class.getName()
+                ));
+        final EntryFilterProvider oldEntryFilterProvider = 
pulsar.getBrokerService().getEntryFilterProvider();
+        FieldUtils.writeField(pulsar.getBrokerService(),
+                "entryFilterProvider", testEntryFilterProvider, true);
+
+        try {
+            final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
+            admin.namespaces().createNamespace(myNamespace, 
Sets.newHashSet("test"));
+            try {
+                admin.namespaces().setNamespaceEntryFilters(myNamespace, new 
EntryFilters("notexists"));
+                fail();
+            } catch (PulsarAdminException e) {
+                assertEquals(e.getStatusCode(), 400);
+                assertEquals(e.getMessage(), "Entry filter 'notexists' not 
found");
+            }
+            try {
+                admin.namespaces().setNamespaceEntryFilters(myNamespace, new 
EntryFilters(""));
+                fail();
+            } catch (PulsarAdminException e) {
+                assertEquals(e.getStatusCode(), 400);
+                assertEquals(e.getMessage(), "entryFilterNames can't be empty. 
" +
+                        "To remove entry filters use the remove method.");
+            }
+            try {
+                admin.namespaces().setNamespaceEntryFilters(myNamespace, new 
EntryFilters(","));
+                fail();
+            } catch (PulsarAdminException e) {
+                assertEquals(e.getStatusCode(), 400);
+                assertEquals(e.getMessage(), "entryFilterNames can't be empty. 
" +
+                        "To remove entry filters use the remove method.");
+            }
+            try {
+                admin.namespaces().setNamespaceEntryFilters(myNamespace, new 
EntryFilters("test,notexists"));
+                fail();
+            } catch (PulsarAdminException e) {
+                assertEquals(e.getStatusCode(), 400);
+                assertEquals(e.getMessage(), "Entry filter 'notexists' not 
found");
+            }
+            
assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
+        } finally {
+            FieldUtils.writeField(pulsar.getBrokerService(),
+                    "entryFilterProvider", oldEntryFilterProvider, true);
+        }
+    }
+
+    @Test(timeOut = 30000)
+    public void testValidateTopicEntryFilters() throws Exception {
+        final MockEntryFilterProvider testEntryFilterProvider =
+                new MockEntryFilterProvider(conf);
+
+        testEntryFilterProvider
+                .setMockEntryFilters(new EntryFilterDefinition(
+                        "test",
+                        null,
+                        EntryFilterTest.class.getName()
+                ));
+        final EntryFilterProvider oldEntryFilterProvider = 
pulsar.getBrokerService().getEntryFilterProvider();
+        FieldUtils.writeField(pulsar.getBrokerService(),
+                "entryFilterProvider", testEntryFilterProvider, true);
+
+        try {
+            final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
+            admin.namespaces().createNamespace(myNamespace, 
Sets.newHashSet("test"));
+            final String topicName = myNamespace + "/topic";
+            admin.topics().createNonPartitionedTopic(topicName);
+            @Cleanup
+            Producer<byte[]> producer1 = pulsarClient.newProducer()
+                    .topic(topicName)
+                    .create();
+            try {
+                admin.topicPolicies().setEntryFiltersPerTopic(topicName, new 
EntryFilters("notexists"));
+                fail();
+            } catch (PulsarAdminException e) {
+                assertEquals(e.getStatusCode(), 400);
+                assertEquals(e.getMessage(), "Entry filter 'notexists' not 
found");
+            }
+            try {
+                admin.topicPolicies().setEntryFiltersPerTopic(topicName, new 
EntryFilters(""));
+                fail();
+            } catch (PulsarAdminException e) {
+                assertEquals(e.getStatusCode(), 400);
+                assertEquals(e.getMessage(), "entryFilterNames can't be empty. 
" +
+                        "To remove entry filters use the remove method.");
+            }
+            try {
+                admin.topicPolicies().setEntryFiltersPerTopic(topicName, new 
EntryFilters(","));
+                fail();
+            } catch (PulsarAdminException e) {
+                assertEquals(e.getStatusCode(), 400);
+                assertEquals(e.getMessage(), "entryFilterNames can't be empty. 
" +
+                        "To remove entry filters use the remove method.");
+            }
+            try {
+                admin.topicPolicies().setEntryFiltersPerTopic(topicName, new 
EntryFilters("test,notexists"));
+                fail();
+            } catch (PulsarAdminException e) {
+                assertEquals(e.getStatusCode(), 400);
+                assertEquals(e.getMessage(), "Entry filter 'notexists' not 
found");
+            }
+            
assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topicName, false));
+        } finally {
+            FieldUtils.writeField(pulsar.getBrokerService(),
+                    "entryFilterProvider", oldEntryFilterProvider, true);
+        }
+    }
+
     @Test(timeOut = 30000)
     public void testMaxSubPerTopic() throws Exception {
         pulsar.getConfiguration().setMaxSubscriptionsPerTopic(0);

Reply via email to