eolivelli commented on code in PR #17153:
URL: https://github.com/apache/pulsar/pull/17153#discussion_r949016124


##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java:
##########
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class EntryFilters {
+    /**
+     * The description of the entry filter to be used for user help.
+     */
+    private String description;
+
+    /**
+     * The class name for the entry filter.
+     */
+    private String entryFilterNames;
+
+    /**
+     * The directory for all the entry filter implementations.
+     */
+    private String entryFiltersDirectory;

Review Comment:
   This is a parameter that is only useful only in ServiceConfiguration 
(broker.conf) because it refers to the local filesystem of every specific 
broker.
   
   I think we should drop this from here



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -2651,5 +2652,66 @@ public void finished(int total, int errors, int unknown) 
throws Exception {
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/entryFilters")
+    @ApiOperation(value = "Get maxConsumersPerSubscription config on a 
namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public void getEntryFiltersPerTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ENTRY_FILTERS, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(polices -> 
asyncResponse.resume(polices.entryFilters))

Review Comment:
   we must not  return the entry filters directory to clients, as it will 
disclose sensitive information about the broker filesystem
   
   please create a new java class to model the response and report only the 
list of enabled entry filters



##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java:
##########
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class EntryFilters {
+    /**
+     * The description of the entry filter to be used for user help.
+     */
+    private String description;

Review Comment:
   this is not useful, we generally do not store "descriptions". we have to 
drop this field



##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java:
##########
@@ -146,6 +147,75 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
         jcommander.addCommand("remove-schema-compatibility-strategy", new 
RemoveSchemaCompatibilityStrategy());
         jcommander.addCommand("set-schema-compatibility-strategy", new 
SetSchemaCompatibilityStrategy());
         jcommander.addCommand("get-schema-compatibility-strategy", new 
GetSchemaCompatibilityStrategy());
+
+        jcommander.addCommand("get-entry-filters-per-topic", new 
GetEntryFiltersPerTopic());
+        jcommander.addCommand("set-entry-filters-per-topic", new 
setEntryFiltersPerTopic());
+        jcommander.addCommand("remove-entry-filters-per-topic", new 
RemoveEntryFiltersPerTopic());
+    }
+
+    @Parameters(commandDescription = "Get entry filters for a topic")
+    private class GetEntryFiltersPerTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to get 
this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Parameter(names = { "-ap", "--applied" }, description = "Get the 
applied policy of the topic")
+        private boolean applied = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            
print(getTopicPolicies(isGlobal).getEntryFiltersPerTopic(persistentTopic, 
applied));
+        }
+    }
+
+    @Parameters(commandDescription = "Set entry filters for a topic")
+    private class setEntryFiltersPerTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--desc", "-d" },
+                description = "The description of the entry filter to be used 
for user help.", required = false)
+        private String  description = "";
+
+        @Parameter(names = { "--entry-filters-name", "-efn" },
+                description = "The class name for the entry filter.", required 
= true)
+        private String  entryFiltersName = "";
+
+
+        @Parameter(names = { "--entry-filters-dir", "-efd" },
+                description = " The directory for all the entry filter 
implementations.", required = true)
+        private String  entryFiltersDirectory = "";

Review Comment:
   as in other places please drop this field



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -321,9 +322,6 @@ public BrokerService(PulsarService pulsar, EventLoopGroup 
eventLoopGroup) throws
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-stats-updater"));
         this.authorizationService = new AuthorizationService(
                 pulsar.getConfiguration(), pulsar().getPulsarResources());
-        if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) {

Review Comment:
   it is better still keep a list of global entry filters and initialise them 
only once.
   
   in a broker with thousands of topics the overhead may be non negligible, a 
EntryFilterFactory may have to perform some initialisation at boot  
   
   also a EntryFilter may have some local cache.
   in 2.10 the EntryFilter instance is global, so the cache is shared, if we 
create a filter entry per each topic then this kind of caching will be useless.
   
   I think that we have to keep a list of EntryFilters globally, per-namespace 
and per-topic.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java:
##########
@@ -2651,5 +2652,66 @@ public void finished(int total, int errors, int unknown) 
throws Exception {
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/entryFilters")
+    @ApiOperation(value = "Get maxConsumersPerSubscription config on a 
namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public void getEntryFiltersPerTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ENTRY_FILTERS, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(polices -> 
asyncResponse.resume(polices.entryFilters))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get entry filters config on 
namespace {}: {} ",
+                            clientAppId(), namespaceName, 
ex.getCause().getMessage(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/entryFilters")
+    @ApiOperation(value = "Set entry filters for namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist")})
+    public void setEntryFiltersPerTopic(@Suspended AsyncResponse 
asyncResponse, @PathParam("tenant") String tenant,
+                                       @PathParam("namespace") String 
namespace,
+                                       @ApiParam(value = "entry filters", 
required = true)
+                                               EntryFilters entryFilters) {

Review Comment:
   the client must not be able to set the directory, but only the list of 
enabled filters



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java:
##########
@@ -214,6 +211,10 @@ protected boolean isConsumersExceededOnSubscription() {
 
         @Override
         public boolean trackDelayedDelivery(long ledgerId, long entryId, 
MessageMetadata msgMetadata) {
+            if (!msgMetadata.hasDeliverAtTime()) {

Review Comment:
   this change seems unrelated to the patch



##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java:
##########
@@ -146,6 +147,75 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
         jcommander.addCommand("remove-schema-compatibility-strategy", new 
RemoveSchemaCompatibilityStrategy());
         jcommander.addCommand("set-schema-compatibility-strategy", new 
SetSchemaCompatibilityStrategy());
         jcommander.addCommand("get-schema-compatibility-strategy", new 
GetSchemaCompatibilityStrategy());
+
+        jcommander.addCommand("get-entry-filters-per-topic", new 
GetEntryFiltersPerTopic());
+        jcommander.addCommand("set-entry-filters-per-topic", new 
setEntryFiltersPerTopic());
+        jcommander.addCommand("remove-entry-filters-per-topic", new 
RemoveEntryFiltersPerTopic());
+    }
+
+    @Parameters(commandDescription = "Get entry filters for a topic")
+    private class GetEntryFiltersPerTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to get 
this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Parameter(names = { "-ap", "--applied" }, description = "Get the 
applied policy of the topic")
+        private boolean applied = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            
print(getTopicPolicies(isGlobal).getEntryFiltersPerTopic(persistentTopic, 
applied));
+        }
+    }
+
+    @Parameters(commandDescription = "Set entry filters for a topic")
+    private class setEntryFiltersPerTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--desc", "-d" },
+                description = "The description of the entry filter to be used 
for user help.", required = false)
+        private String  description = "";

Review Comment:
   as in other places please drop this field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to