This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 42cd494a8f8 [fix][broker] Pass subscriptionName to auth service
(#17123)
42cd494a8f8 is described below
commit 42cd494a8f88ea55e617e4bfcaece4cf7df2f953
Author: Michael Marshall <[email protected]>
AuthorDate: Thu Aug 18 10:48:53 2022 -0700
[fix][broker] Pass subscriptionName to auth service (#17123)
(cherry picked from commit e9c3e21c7a39565e15df1671357dc404527c533f)
---
.../pulsar/broker/admin/impl/PersistentTopicsBase.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7b747975306..ab403357e05 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1490,7 +1490,7 @@ public class PersistentTopicsBase extends AdminResource {
private void
internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
String
subName, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE))
+ .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE, subName))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
@@ -1590,7 +1590,7 @@ public class PersistentTopicsBase extends AdminResource {
private void
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse
asyncResponse,
String subName, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE))
+ .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE, subName))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
@@ -1745,7 +1745,7 @@ public class PersistentTopicsBase extends AdminResource {
}
validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.SKIP);
+ validateTopicOperation(topicName, TopicOperation.SKIP, subName);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
@@ -2150,7 +2150,7 @@ public class PersistentTopicsBase extends AdminResource {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> {
- validateTopicOperation(topicName,
TopicOperation.SUBSCRIBE);
+ validateTopicOperation(topicName,
TopicOperation.SUBSCRIBE, subscriptionName);
return
pulsar().getBrokerService().getTopic(topicName.toString(),
isAllowAutoTopicCreation);
}).thenApply(optTopic -> {
if (optTopic.isPresent()) {
@@ -2447,7 +2447,7 @@ public class PersistentTopicsBase extends AdminResource {
}
validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
+ validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES,
subName);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {}
{}", clientAppId(), topicName,
@@ -3293,7 +3293,7 @@ public class PersistentTopicsBase extends AdminResource {
}
validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
+ validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES,
subName);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {}
{}", clientAppId(), topicName, subName);
@@ -3356,7 +3356,7 @@ public class PersistentTopicsBase extends AdminResource {
}
validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
+ validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES,
subName);
log.info("[{}][{}] received expire messages on subscription {} to
position {}", clientAppId(), topicName,
subName, messageId);