This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e9c3e21c7a3 [fix][broker] Pass subscriptionName to auth service
(#17123)
e9c3e21c7a3 is described below
commit e9c3e21c7a39565e15df1671357dc404527c533f
Author: Michael Marshall <[email protected]>
AuthorDate: Thu Aug 18 10:48:53 2022 -0700
[fix][broker] Pass subscriptionName to auth service (#17123)
---
.../broker/admin/impl/PersistentTopicsBase.java | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index de11d49d773..6521cee29ba 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1555,7 +1555,7 @@ public class PersistentTopicsBase extends AdminResource {
private void
internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
String
subName, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE))
+ .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
@@ -1590,7 +1590,7 @@ public class PersistentTopicsBase extends AdminResource {
Optional<Position> position,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.CONSUME))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.CONSUME, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
@@ -1646,7 +1646,7 @@ public class PersistentTopicsBase extends AdminResource {
String subName, Map<String, String>
subscriptionProperties,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.CONSUME))
+ .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.CONSUME, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
@@ -1673,7 +1673,7 @@ public class PersistentTopicsBase extends AdminResource {
String subName,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.CONSUME))
+ .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.CONSUME, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenApply((Topic topic) -> {
Subscription sub = topic.getSubscription(subName);
@@ -1776,7 +1776,7 @@ public class PersistentTopicsBase extends AdminResource {
private void
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse
asyncResponse,
String subName, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE))
+ .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
@@ -1923,7 +1923,7 @@ public class PersistentTopicsBase extends AdminResource {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.SKIP))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.SKIP, subName))
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
.thenCompose(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
@@ -2332,7 +2332,7 @@ public class PersistentTopicsBase extends AdminResource {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> {
- validateTopicOperation(topicName,
TopicOperation.SUBSCRIBE);
+ validateTopicOperation(topicName,
TopicOperation.SUBSCRIBE, subscriptionName);
return
pulsar().getBrokerService().getTopic(topicName.toString(),
isAllowAutoTopicCreation);
}).thenApply(optTopic -> {
if (optTopic.isPresent()) {
@@ -2870,7 +2870,7 @@ public class PersistentTopicsBase extends AdminResource {
ret = CompletableFuture.completedFuture(null);
}
return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.PEEK_MESSAGES))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.PEEK_MESSAGES, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
CompletableFuture<Entry> entry;
@@ -3790,7 +3790,7 @@ public class PersistentTopicsBase extends AdminResource {
}
future.thenCompose(__ ->
validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(unused -> validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES))
+ .thenCompose(unused -> validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES, subName))
.thenCompose(unused2 ->
// If the topic name is a partition name, no need to
get partition topic metadata again
getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
@@ -3942,7 +3942,7 @@ public class PersistentTopicsBase extends AdminResource {
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES, subName))
.thenCompose(__ -> {
log.info("[{}][{}] received expire messages on
subscription {} to position {}", clientAppId(),
topicName, subName, messageId);