This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e9c3e21c7a3 [fix][broker] Pass subscriptionName to auth service 
(#17123)
e9c3e21c7a3 is described below

commit e9c3e21c7a39565e15df1671357dc404527c533f
Author: Michael Marshall <[email protected]>
AuthorDate: Thu Aug 18 10:48:53 2022 -0700

    [fix][broker] Pass subscriptionName to auth service (#17123)
---
 .../broker/admin/impl/PersistentTopicsBase.java      | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index de11d49d773..6521cee29ba 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1555,7 +1555,7 @@ public class PersistentTopicsBase extends AdminResource {
     private void 
internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
                                                                   String 
subName, boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-            .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE))
+            .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE, subName))
             .thenCompose(__ -> getTopicReferenceAsync(topicName))
             .thenCompose(topic -> {
                 Subscription sub = topic.getSubscription(subName);
@@ -1590,7 +1590,7 @@ public class PersistentTopicsBase extends AdminResource {
                                                                           
Optional<Position> position,
                                                                           
boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.CONSUME))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.CONSUME, subName))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenCompose(topic -> {
                             Subscription sub = topic.getSubscription(subName);
@@ -1646,7 +1646,7 @@ public class PersistentTopicsBase extends AdminResource {
                                       String subName, Map<String, String> 
subscriptionProperties,
                                       boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.CONSUME))
+                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.CONSUME, subName))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenCompose(topic -> {
                     Subscription sub = topic.getSubscription(subName);
@@ -1673,7 +1673,7 @@ public class PersistentTopicsBase extends AdminResource {
                                                                             
String subName,
                                                                             
boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.CONSUME))
+                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.CONSUME, subName))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenApply((Topic topic) -> {
                     Subscription sub = topic.getSubscription(subName);
@@ -1776,7 +1776,7 @@ public class PersistentTopicsBase extends AdminResource {
     private void 
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse 
asyncResponse,
                                                                             
String subName, boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE))
+                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE, subName))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenCompose(topic -> {
                     Subscription sub = topic.getSubscription(subName);
@@ -1923,7 +1923,7 @@ public class PersistentTopicsBase extends AdminResource {
             future = CompletableFuture.completedFuture(null);
         }
         future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.SKIP))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.SKIP, subName))
                 .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
                      .thenCompose(partitionMetadata -> {
                          if (partitionMetadata.partitions > 0) {
@@ -2332,7 +2332,7 @@ public class PersistentTopicsBase extends AdminResource {
 
         validateTopicOwnershipAsync(topicName, authoritative)
                 .thenCompose(__ -> {
-                    validateTopicOperation(topicName, 
TopicOperation.SUBSCRIBE);
+                    validateTopicOperation(topicName, 
TopicOperation.SUBSCRIBE, subscriptionName);
                     return 
pulsar().getBrokerService().getTopic(topicName.toString(), 
isAllowAutoTopicCreation);
                 }).thenApply(optTopic -> {
             if (optTopic.isPresent()) {
@@ -2870,7 +2870,7 @@ public class PersistentTopicsBase extends AdminResource {
             ret = CompletableFuture.completedFuture(null);
         }
         return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES, subName))
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenCompose(topic -> {
                     CompletableFuture<Entry> entry;
@@ -3790,7 +3790,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
         future.thenCompose(__ ->
                 validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES))
+                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES, subName))
                 .thenCompose(unused2 ->
                         // If the topic name is a partition name, no need to 
get partition topic metadata again
                         getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
@@ -3942,7 +3942,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
 
         future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES, subName))
                 .thenCompose(__ -> {
                     log.info("[{}][{}] received expire messages on 
subscription {} to position {}", clientAppId(),
                             topicName, subName, messageId);

Reply via email to