This is an automated email from the ASF dual-hosted git repository. mmarshall pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7990948a73e [refactor][broker] Use AuthenticationParameters for rest producer (#20046) 7990948a73e is described below commit 7990948a73e2c4dfa0e9c99ff223f0ee90e82dc3 Author: Michael Marshall <mmarsh...@apache.org> AuthorDate: Mon Apr 10 12:13:13 2023 -0500 [refactor][broker] Use AuthenticationParameters for rest producer (#20046) ### Motivation In #19975, we introduced a wrapper for all authentication parameters. This PR adds that wrapper to the Rest Producer. ### Modifications * Use `AuthenticationParameters` to simplify parameter management in Rest Producer. * Add method to the `AuthorizationService` that takes the `AuthenticationParameters`. * Update annotations on Rest Producer to indicate that a 401 is an expected response. ### Verifying this change This change is covered by the `TopicsAuthTest`. ### Documentation - [x] `doc-not-needed` This is an internal change that does not need to be documented. ### Matching PR in forked repository PR in forked repository: skipping PR since the relevant tests pass locally --- .../broker/authorization/AuthorizationService.java | 7 ++++++ .../java/org/apache/pulsar/broker/rest/Topics.java | 4 ++++ .../org/apache/pulsar/broker/rest/TopicsBase.java | 25 ++++++++++++++++------ 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 6c730f20092..706eadf0ec2 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -748,6 +748,13 @@ public class AuthorizationService { } } + public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName, + TopicOperation operation, + AuthenticationParameters authParams) { + return allowTopicOperationAsync(topicName, operation, authParams.getOriginalPrincipal(), + authParams.getClientRole(), authParams.getClientAuthenticationDataSource()); + } + public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName, TopicOperation operation, String originalRole, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java index 5b17b05db20..f1e7009df02 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java @@ -49,6 +49,7 @@ public class Topics extends TopicsBase { @Path("/persistent/{tenant}/{namespace}/{topic}") @ApiOperation(value = "Produce message to a persistent topic.", response = String.class, responseContainer = "List") @ApiResponses(value = { + @ApiResponse(code = 401, message = "Client is not authorized to perform operation"), @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"), @ApiResponse(code = 412, message = "Namespace name is not valid"), @ApiResponse(code = 500, message = "Internal server error") }) @@ -76,6 +77,7 @@ public class Topics extends TopicsBase { @ApiOperation(value = "Produce message to a partition of a persistent topic.", response = String.class, responseContainer = "List") @ApiResponses(value = { + @ApiResponse(code = 401, message = "Client is not authorized to perform operation"), @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"), @ApiResponse(code = 412, message = "Namespace name is not valid"), @ApiResponse(code = 500, message = "Internal server error") }) @@ -104,6 +106,7 @@ public class Topics extends TopicsBase { @Path("/non-persistent/{tenant}/{namespace}/{topic}") @ApiOperation(value = "Produce message to a persistent topic.", response = String.class, responseContainer = "List") @ApiResponses(value = { + @ApiResponse(code = 401, message = "Client is not authorized to perform operation"), @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"), @ApiResponse(code = 412, message = "Namespace name is not valid"), @ApiResponse(code = 500, message = "Internal server error") }) @@ -132,6 +135,7 @@ public class Topics extends TopicsBase { @ApiOperation(value = "Produce message to a partition of a persistent topic.", response = String.class, responseContainer = "List") @ApiResponses(value = { + @ApiResponse(code = 401, message = "Client is not authorized to perform operation"), @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"), @ApiResponse(code = 412, message = "Namespace name is not valid"), @ApiResponse(code = 500, message = "Internal server error") }) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java index 4614eb59a70..7d3aa37fa4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.rest; +import static java.util.concurrent.TimeUnit.SECONDS; import com.fasterxml.jackson.databind.ObjectReader; import io.netty.buffer.ByteBuf; import java.io.IOException; @@ -54,6 +55,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; +import org.apache.pulsar.broker.authentication.AuthenticationParameters; import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -81,6 +83,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; @@ -762,14 +765,24 @@ public class TopicsBase extends PersistentTopicsBase { if (!isClientAuthenticated(clientAppId())) { throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request"); } + AuthenticationParameters authParams = authParams(); + boolean isAuthorized; + try { + isAuthorized = pulsar().getBrokerService().getAuthorizationService() + .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, authParams) + .get(config().getMetadataStoreOperationTimeoutSeconds(), SECONDS); + } catch (InterruptedException e) { + log.warn("Time-out {} sec while checking authorization on {} ", + config().getMetadataStoreOperationTimeoutSeconds(), topicName); + throw new RestException(Status.INTERNAL_SERVER_ERROR, "Time-out while checking authorization"); + } catch (Exception e) { + log.warn("Producer-client with Role - {} {} failed to get permissions for topic - {}. {}", + authParams.getClientRole(), authParams.getOriginalPrincipal(), topicName, e.getMessage()); + throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get permissions"); + } - boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService() - .canProduce(topicName, originalPrincipal() == null ? clientAppId() : originalPrincipal(), - clientAuthData()); if (!isAuthorized) { - throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to produce to topic %s" - + " with clientAppId [%s] and authdata %s", topicName.toString(), - clientAppId(), clientAuthData())); + throw new RestException(Status.UNAUTHORIZED, "Unauthorized to produce to topic " + topicName); } } }