This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch 2.10.4/pass_subName
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a1d17f9e7c28a036546a8de8996d69b369f7a87a
Author: Michael Marshall <[email protected]>
AuthorDate: Thu Aug 18 10:48:53 2022 -0700

    [fix][broker] Pass subscriptionName to auth service (#17123)
    
    (cherry picked from commit e9c3e21c7a39565e15df1671357dc404527c533f)
---
 .../pulsar/broker/admin/impl/PersistentTopicsBase.java   | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4fa7ff79bfe..22f47bf9751 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1608,7 +1608,7 @@ public class PersistentTopicsBase extends AdminResource {
     private void 
internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
                                                                   String 
subName, boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-            .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE))
+            .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE, subName))
             .thenCompose(__ -> {
                 Topic topic = getTopicReference(topicName);
                 Subscription sub = topic.getSubscription(subName);
@@ -1641,7 +1641,7 @@ public class PersistentTopicsBase extends AdminResource {
                                       String subName, Map<String, String> 
subscriptionProperties,
                                       boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.CONSUME))
+                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.CONSUME, subName))
                 .thenCompose(__ -> {
                     Topic topic = getTopicReference(topicName);
                     Subscription sub = topic.getSubscription(subName);
@@ -1740,7 +1740,7 @@ public class PersistentTopicsBase extends AdminResource {
     private void 
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse 
asyncResponse,
                                                                             
String subName, boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE))
+                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE, subName))
                 .thenCompose(__ -> {
                     Topic topic = getTopicReference(topicName);
                     Subscription sub = topic.getSubscription(subName);
@@ -1883,7 +1883,7 @@ public class PersistentTopicsBase extends AdminResource {
             future = CompletableFuture.completedFuture(null);
         }
         future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.SKIP))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.SKIP, subName))
                 .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
                      .thenCompose(partitionMetadata -> {
                          if (partitionMetadata.partitions > 0) {
@@ -2289,7 +2289,7 @@ public class PersistentTopicsBase extends AdminResource {
 
         validateTopicOwnershipAsync(topicName, authoritative)
                 .thenCompose(__ -> {
-                    validateTopicOperation(topicName, 
TopicOperation.SUBSCRIBE);
+                    validateTopicOperation(topicName, 
TopicOperation.SUBSCRIBE, subscriptionName);
                     return 
pulsar().getBrokerService().getTopic(topicName.toString(), 
isAllowAutoTopicCreation);
                 }).thenApply(optTopic -> {
             if (optTopic.isPresent()) {
@@ -2687,7 +2687,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
 
         validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
+        validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES, 
subName);
 
         if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
             log.error("[{}] Not supported operation of non-persistent topic {} 
{}", clientAppId(), topicName,
@@ -3575,7 +3575,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
         future.thenCompose(__ ->
                 validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES))
+                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES, subName))
                 .thenCompose(unused2 ->
                         // If the topic name is a partition name, no need to 
get partition topic metadata again
                         getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
@@ -3724,7 +3724,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
 
         future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES, subName))
                 .thenCompose(__ -> {
                     log.info("[{}][{}] received expire messages on 
subscription {} to position {}", clientAppId(),
                             topicName, subName, messageId);

Reply via email to