This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7990948a73e [refactor][broker] Use AuthenticationParameters for rest 
producer (#20046)
7990948a73e is described below

commit 7990948a73e2c4dfa0e9c99ff223f0ee90e82dc3
Author: Michael Marshall <mmarsh...@apache.org>
AuthorDate: Mon Apr 10 12:13:13 2023 -0500

    [refactor][broker] Use AuthenticationParameters for rest producer (#20046)
    
    ### Motivation
    
    In #19975, we introduced a wrapper for all authentication parameters. This 
PR adds that wrapper to the Rest Producer.
    
    ### Modifications
    
    * Use `AuthenticationParameters` to simplify parameter management in Rest 
Producer.
    * Add method to the `AuthorizationService` that takes the 
`AuthenticationParameters`.
    * Update annotations on Rest Producer to indicate that a 401 is an expected 
response.
    
    ### Verifying this change
    
    This change is covered by the `TopicsAuthTest`.
    
    ### Documentation
    
    - [x] `doc-not-needed`
    
    This is an internal change that does not need to be documented.
    
    ### Matching PR in forked repository
    
    PR in forked repository: skipping PR since the relevant tests pass locally
---
 .../broker/authorization/AuthorizationService.java |  7 ++++++
 .../java/org/apache/pulsar/broker/rest/Topics.java |  4 ++++
 .../org/apache/pulsar/broker/rest/TopicsBase.java  | 25 ++++++++++++++++------
 3 files changed, 30 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 6c730f20092..706eadf0ec2 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -748,6 +748,13 @@ public class AuthorizationService {
         }
     }
 
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topicName,
+                                                               TopicOperation 
operation,
+                                                               
AuthenticationParameters authParams) {
+        return allowTopicOperationAsync(topicName, operation, 
authParams.getOriginalPrincipal(),
+                authParams.getClientRole(), 
authParams.getClientAuthenticationDataSource());
+    }
+
     public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topicName,
                                                                TopicOperation 
operation,
                                                                String 
originalRole,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
index 5b17b05db20..f1e7009df02 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
@@ -49,6 +49,7 @@ public class Topics extends TopicsBase {
     @Path("/persistent/{tenant}/{namespace}/{topic}")
     @ApiOperation(value = "Produce message to a persistent topic.", response = 
String.class, responseContainer = "List")
     @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "Client is not authorized to 
perform operation"),
             @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't 
exit"),
             @ApiResponse(code = 412, message = "Namespace name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error") })
@@ -76,6 +77,7 @@ public class Topics extends TopicsBase {
     @ApiOperation(value = "Produce message to a partition of a persistent 
topic.",
             response = String.class, responseContainer = "List")
     @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "Client is not authorized to 
perform operation"),
             @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't 
exit"),
             @ApiResponse(code = 412, message = "Namespace name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error") })
@@ -104,6 +106,7 @@ public class Topics extends TopicsBase {
     @Path("/non-persistent/{tenant}/{namespace}/{topic}")
     @ApiOperation(value = "Produce message to a persistent topic.", response = 
String.class, responseContainer = "List")
     @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "Client is not authorized to 
perform operation"),
             @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't 
exit"),
             @ApiResponse(code = 412, message = "Namespace name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error") })
@@ -132,6 +135,7 @@ public class Topics extends TopicsBase {
     @ApiOperation(value = "Produce message to a partition of a persistent 
topic.",
             response = String.class, responseContainer = "List")
     @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "Client is not authorized to 
perform operation"),
             @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't 
exit"),
             @ApiResponse(code = 412, message = "Namespace name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error") })
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
index 4614eb59a70..7d3aa37fa4e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.rest;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import com.fasterxml.jackson.databind.ObjectReader;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
@@ -54,6 +55,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.authentication.AuthenticationParameters;
 import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -81,6 +83,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -762,14 +765,24 @@ public class TopicsBase extends PersistentTopicsBase {
             if (!isClientAuthenticated(clientAppId())) {
                 throw new RestException(Status.UNAUTHORIZED, "Need to 
authenticate to perform the request");
             }
+            AuthenticationParameters authParams = authParams();
+            boolean isAuthorized;
+            try {
+                isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
+                        .allowTopicOperationAsync(topicName, 
TopicOperation.PRODUCE, authParams)
+                        
.get(config().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+            } catch (InterruptedException e) {
+                log.warn("Time-out {} sec while checking authorization on {} ",
+                        config().getMetadataStoreOperationTimeoutSeconds(), 
topicName);
+                throw new RestException(Status.INTERNAL_SERVER_ERROR, 
"Time-out while checking authorization");
+            } catch (Exception e) {
+                log.warn("Producer-client  with Role - {} {} failed to get 
permissions for topic - {}. {}",
+                        authParams.getClientRole(), 
authParams.getOriginalPrincipal(), topicName, e.getMessage());
+                throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed 
to get permissions");
+            }
 
-            boolean isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
-                    .canProduce(topicName, originalPrincipal() == null ? 
clientAppId() : originalPrincipal(),
-                            clientAuthData());
             if (!isAuthorized) {
-                throw new RestException(Status.UNAUTHORIZED, 
String.format("Unauthorized to produce to topic %s"
-                                        + " with clientAppId [%s] and authdata 
%s", topicName.toString(),
-                        clientAppId(), clientAuthData()));
+                throw new RestException(Status.UNAUTHORIZED, "Unauthorized to 
produce to topic " + topicName);
             }
         }
     }

Reply via email to