This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch 2.10.4/pass_subName in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a1d17f9e7c28a036546a8de8996d69b369f7a87a Author: Michael Marshall <[email protected]> AuthorDate: Thu Aug 18 10:48:53 2022 -0700 [fix][broker] Pass subscriptionName to auth service (#17123) (cherry picked from commit e9c3e21c7a39565e15df1671357dc404527c533f) --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4fa7ff79bfe..22f47bf9751 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1608,7 +1608,7 @@ public class PersistentTopicsBase extends AdminResource { private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) { validateTopicOwnershipAsync(topicName, authoritative) - .thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE)) + .thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE, subName)) .thenCompose(__ -> { Topic topic = getTopicReference(topicName); Subscription sub = topic.getSubscription(subName); @@ -1641,7 +1641,7 @@ public class PersistentTopicsBase extends AdminResource { String subName, Map<String, String> subscriptionProperties, boolean authoritative) { validateTopicOwnershipAsync(topicName, authoritative) - .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME)) + .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME, subName)) .thenCompose(__ -> { Topic topic = getTopicReference(topicName); Subscription sub = topic.getSubscription(subName); @@ -1740,7 +1740,7 @@ public class PersistentTopicsBase extends AdminResource { private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) { validateTopicOwnershipAsync(topicName, authoritative) - .thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE)) + .thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE, subName)) .thenCompose(__ -> { Topic topic = getTopicReference(topicName); Subscription sub = topic.getSubscription(subName); @@ -1883,7 +1883,7 @@ public class PersistentTopicsBase extends AdminResource { future = CompletableFuture.completedFuture(null); } future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP)) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName)) .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false) .thenCompose(partitionMetadata -> { if (partitionMetadata.partitions > 0) { @@ -2289,7 +2289,7 @@ public class PersistentTopicsBase extends AdminResource { validateTopicOwnershipAsync(topicName, authoritative) .thenCompose(__ -> { - validateTopicOperation(topicName, TopicOperation.SUBSCRIBE); + validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName); return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation); }).thenApply(optTopic -> { if (optTopic.isPresent()) { @@ -2687,7 +2687,7 @@ public class PersistentTopicsBase extends AdminResource { } validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES); + validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES, subName); if (!(getTopicReference(topicName) instanceof PersistentTopic)) { log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName, @@ -3575,7 +3575,7 @@ public class PersistentTopicsBase extends AdminResource { } future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)) + .thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES, subName)) .thenCompose(unused2 -> // If the topic name is a partition name, no need to get partition topic metadata again getPartitionedTopicMetadataAsync(topicName, authoritative, false) @@ -3724,7 +3724,7 @@ public class PersistentTopicsBase extends AdminResource { } future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES, subName)) .thenCompose(__ -> { log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(), topicName, subName, messageId);
