eolivelli commented on code in PR #17153: URL: https://github.com/apache/pulsar/pull/17153#discussion_r949016124
########## pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java: ########## @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class EntryFilters { + /** + * The description of the entry filter to be used for user help. + */ + private String description; + + /** + * The class name for the entry filter. + */ + private String entryFilterNames; + + /** + * The directory for all the entry filter implementations. + */ + private String entryFiltersDirectory; Review Comment: This is a parameter that is only useful only in ServiceConfiguration (broker.conf) because it refers to the local filesystem of every specific broker. I think we should drop this from here ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java: ########## @@ -2651,5 +2652,66 @@ public void finished(int total, int errors, int unknown) throws Exception { } } + @GET + @Path("/{tenant}/{namespace}/entryFilters") + @ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist") }) + public void getEntryFiltersPerTopic( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ENTRY_FILTERS, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(polices -> asyncResponse.resume(polices.entryFilters)) Review Comment: we must not return the entry filters directory to clients, as it will disclose sensitive information about the broker filesystem please create a new java class to model the response and report only the list of enabled entry filters ########## pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java: ########## @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class EntryFilters { + /** + * The description of the entry filter to be used for user help. + */ + private String description; Review Comment: this is not useful, we generally do not store "descriptions". we have to drop this field ########## pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java: ########## @@ -146,6 +147,75 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) { jcommander.addCommand("remove-schema-compatibility-strategy", new RemoveSchemaCompatibilityStrategy()); jcommander.addCommand("set-schema-compatibility-strategy", new SetSchemaCompatibilityStrategy()); jcommander.addCommand("get-schema-compatibility-strategy", new GetSchemaCompatibilityStrategy()); + + jcommander.addCommand("get-entry-filters-per-topic", new GetEntryFiltersPerTopic()); + jcommander.addCommand("set-entry-filters-per-topic", new setEntryFiltersPerTopic()); + jcommander.addCommand("remove-entry-filters-per-topic", new RemoveEntryFiltersPerTopic()); + } + + @Parameters(commandDescription = "Get entry filters for a topic") + private class GetEntryFiltersPerTopic extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic") + private boolean applied = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(getTopicPolicies(isGlobal).getEntryFiltersPerTopic(persistentTopic, applied)); + } + } + + @Parameters(commandDescription = "Set entry filters for a topic") + private class setEntryFiltersPerTopic extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = { "--desc", "-d" }, + description = "The description of the entry filter to be used for user help.", required = false) + private String description = ""; + + @Parameter(names = { "--entry-filters-name", "-efn" }, + description = "The class name for the entry filter.", required = true) + private String entryFiltersName = ""; + + + @Parameter(names = { "--entry-filters-dir", "-efd" }, + description = " The directory for all the entry filter implementations.", required = true) + private String entryFiltersDirectory = ""; Review Comment: as in other places please drop this field ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ########## @@ -321,9 +322,6 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater")); this.authorizationService = new AuthorizationService( pulsar.getConfiguration(), pulsar().getPulsarResources()); - if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) { Review Comment: it is better still keep a list of global entry filters and initialise them only once. in a broker with thousands of topics the overhead may be non negligible, a EntryFilterFactory may have to perform some initialisation at boot also a EntryFilter may have some local cache. in 2.10 the EntryFilter instance is global, so the cache is shared, if we create a filter entry per each topic then this kind of caching will be useless. I think that we have to keep a list of EntryFilters globally, per-namespace and per-topic. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java: ########## @@ -2651,5 +2652,66 @@ public void finished(int total, int errors, int unknown) throws Exception { } } + @GET + @Path("/{tenant}/{namespace}/entryFilters") + @ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist") }) + public void getEntryFiltersPerTopic( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ENTRY_FILTERS, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(polices -> asyncResponse.resume(polices.entryFilters)) + .exceptionally(ex -> { + log.error("[{}] Failed to get entry filters config on namespace {}: {} ", + clientAppId(), namespaceName, ex.getCause().getMessage(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @POST + @Path("/{tenant}/{namespace}/entryFilters") + @ApiOperation(value = "Set entry filters for namespace") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")}) + public void setEntryFiltersPerTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @ApiParam(value = "entry filters", required = true) + EntryFilters entryFilters) { Review Comment: the client must not be able to set the directory, but only the list of enabled filters ########## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java: ########## @@ -214,6 +211,10 @@ protected boolean isConsumersExceededOnSubscription() { @Override public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) { + if (!msgMetadata.hasDeliverAtTime()) { Review Comment: this change seems unrelated to the patch ########## pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java: ########## @@ -146,6 +147,75 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) { jcommander.addCommand("remove-schema-compatibility-strategy", new RemoveSchemaCompatibilityStrategy()); jcommander.addCommand("set-schema-compatibility-strategy", new SetSchemaCompatibilityStrategy()); jcommander.addCommand("get-schema-compatibility-strategy", new GetSchemaCompatibilityStrategy()); + + jcommander.addCommand("get-entry-filters-per-topic", new GetEntryFiltersPerTopic()); + jcommander.addCommand("set-entry-filters-per-topic", new setEntryFiltersPerTopic()); + jcommander.addCommand("remove-entry-filters-per-topic", new RemoveEntryFiltersPerTopic()); + } + + @Parameters(commandDescription = "Get entry filters for a topic") + private class GetEntryFiltersPerTopic extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic") + private boolean applied = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(getTopicPolicies(isGlobal).getEntryFiltersPerTopic(persistentTopic, applied)); + } + } + + @Parameters(commandDescription = "Set entry filters for a topic") + private class setEntryFiltersPerTopic extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = { "--desc", "-d" }, + description = "The description of the entry filter to be used for user help.", required = false) + private String description = ""; Review Comment: as in other places please drop this field -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
