This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 503457b2591 [fix][broker] Pass subscriptionName to auth service
(#17123) (#19423)
503457b2591 is described below
commit 503457b2591890877cdf1e70bc1a9336f7c13279
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat Feb 4 04:52:07 2023 +0800
[fix][broker] Pass subscriptionName to auth service (#17123) (#19423)
Co-authored-by: Michael Marshall <[email protected]>
### Motivation
Cherry-pick https://github.com/apache/pulsar/pull/17123
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
*(Please pick either of the following options)*
This change is a trivial rework / code cleanup without any test coverage.
*(or)*
This change is already covered by existing tests, such as *(please describe
tests)*.
*(or)*
This change added tests and can be verified as follows:
*(example:)*
- *Added integration tests for end-to-end deployment with large payloads
(10MB)*
- *Extended integration test for recovery after broker failure*
### Does this pull request potentially affect one of the following parts:
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
*If the box was checked, please highlight the changes*
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update
later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
### Matching PR in forked repository
PR in forked repository: <!-- ENTER URL HERE -->
<!--
After opening this PR, the build in apache/pulsar will fail and
instructions will
be provided for opening a PR in the PR author's forked repository.
apache/pulsar pull requests should be first tested in your own fork since
the
apache/pulsar CI based on GitHub Actions has constrained resources and
quota.
GitHub Actions provides separate quota for pull requests that are executed
in
a forked repository.
The tests will be run in the forked repository until all PR review comments
have
been handled, the tests pass and the PR is approved by a reviewer.
-->
---
.../pulsar/broker/admin/impl/PersistentTopicsBase.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4fa7ff79bfe..22f47bf9751 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1608,7 +1608,7 @@ public class PersistentTopicsBase extends AdminResource {
private void
internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
String
subName, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE))
+ .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE, subName))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
@@ -1641,7 +1641,7 @@ public class PersistentTopicsBase extends AdminResource {
String subName, Map<String, String>
subscriptionProperties,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.CONSUME))
+ .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.CONSUME, subName))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
@@ -1740,7 +1740,7 @@ public class PersistentTopicsBase extends AdminResource {
private void
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse
asyncResponse,
String subName, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE))
+ .thenRun(() -> validateTopicOperation(topicName,
TopicOperation.UNSUBSCRIBE, subName))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
@@ -1883,7 +1883,7 @@ public class PersistentTopicsBase extends AdminResource {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.SKIP))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.SKIP, subName))
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
.thenCompose(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
@@ -2289,7 +2289,7 @@ public class PersistentTopicsBase extends AdminResource {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> {
- validateTopicOperation(topicName,
TopicOperation.SUBSCRIBE);
+ validateTopicOperation(topicName,
TopicOperation.SUBSCRIBE, subscriptionName);
return
pulsar().getBrokerService().getTopic(topicName.toString(),
isAllowAutoTopicCreation);
}).thenApply(optTopic -> {
if (optTopic.isPresent()) {
@@ -2687,7 +2687,7 @@ public class PersistentTopicsBase extends AdminResource {
}
validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
+ validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES,
subName);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {}
{}", clientAppId(), topicName,
@@ -3575,7 +3575,7 @@ public class PersistentTopicsBase extends AdminResource {
}
future.thenCompose(__ ->
validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(unused -> validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES))
+ .thenCompose(unused -> validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES, subName))
.thenCompose(unused2 ->
// If the topic name is a partition name, no need to
get partition topic metadata again
getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
@@ -3724,7 +3724,7 @@ public class PersistentTopicsBase extends AdminResource {
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES, subName))
.thenCompose(__ -> {
log.info("[{}][{}] received expire messages on
subscription {} to position {}", clientAppId(),
topicName, subName, messageId);