This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 503457b2591 [fix][broker] Pass subscriptionName to auth service 
(#17123) (#19423)
503457b2591 is described below

commit 503457b2591890877cdf1e70bc1a9336f7c13279
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat Feb 4 04:52:07 2023 +0800

    [fix][broker] Pass subscriptionName to auth service (#17123) (#19423)
    
    Co-authored-by: Michael Marshall <[email protected]>
    
    ### Motivation
    
    Cherry-pick https://github.com/apache/pulsar/pull/17123
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    *(Please pick either of the following options)*
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    *(or)*
    
    This change is already covered by existing tests, such as *(please describe 
tests)*.
    
    *(or)*
    
    This change added tests and can be verified as follows:
    
    *(example:)*
      - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
      - *Extended integration test for recovery after broker failure*
    
    ### Does this pull request potentially affect one of the following parts:
    
    <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
    
    *If the box was checked, please highlight the changes*
    
    - [ ] Dependencies (add or upgrade a dependency)
    - [ ] The public API
    - [ ] The schema
    - [ ] The default values of configurations
    - [ ] The threading model
    - [ ] The binary protocol
    - [ ] The REST endpoints
    - [ ] The admin CLI options
    - [ ] The metrics
    - [ ] Anything that affects deployment
    
    ### Documentation
    
    <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
    
    - [ ] `doc` <!-- Your PR contains doc changes. -->
    - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
    - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
    - [ ] `doc-complete` <!-- Docs have been already added -->
    
    ### Matching PR in forked repository
    
    PR in forked repository: <!-- ENTER URL HERE -->
    
    <!--
    After opening this PR, the build in apache/pulsar will fail and 
instructions will
    be provided for opening a PR in the PR author's forked repository.
    
    apache/pulsar pull requests should be first tested in your own fork since 
the
    apache/pulsar CI based on GitHub Actions has constrained resources and 
quota.
    GitHub Actions provides separate quota for pull requests that are executed 
in
    a forked repository.
    
    The tests will be run in the forked repository until all PR review comments 
have
    been handled, the tests pass and the PR is approved by a reviewer.
    -->
---
 .../pulsar/broker/admin/impl/PersistentTopicsBase.java   | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4fa7ff79bfe..22f47bf9751 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1608,7 +1608,7 @@ public class PersistentTopicsBase extends AdminResource {
     private void 
internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
                                                                   String 
subName, boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-            .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE))
+            .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE, subName))
             .thenCompose(__ -> {
                 Topic topic = getTopicReference(topicName);
                 Subscription sub = topic.getSubscription(subName);
@@ -1641,7 +1641,7 @@ public class PersistentTopicsBase extends AdminResource {
                                       String subName, Map<String, String> 
subscriptionProperties,
                                       boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.CONSUME))
+                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.CONSUME, subName))
                 .thenCompose(__ -> {
                     Topic topic = getTopicReference(topicName);
                     Subscription sub = topic.getSubscription(subName);
@@ -1740,7 +1740,7 @@ public class PersistentTopicsBase extends AdminResource {
     private void 
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse 
asyncResponse,
                                                                             
String subName, boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE))
+                .thenRun(() -> validateTopicOperation(topicName, 
TopicOperation.UNSUBSCRIBE, subName))
                 .thenCompose(__ -> {
                     Topic topic = getTopicReference(topicName);
                     Subscription sub = topic.getSubscription(subName);
@@ -1883,7 +1883,7 @@ public class PersistentTopicsBase extends AdminResource {
             future = CompletableFuture.completedFuture(null);
         }
         future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.SKIP))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.SKIP, subName))
                 .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
                      .thenCompose(partitionMetadata -> {
                          if (partitionMetadata.partitions > 0) {
@@ -2289,7 +2289,7 @@ public class PersistentTopicsBase extends AdminResource {
 
         validateTopicOwnershipAsync(topicName, authoritative)
                 .thenCompose(__ -> {
-                    validateTopicOperation(topicName, 
TopicOperation.SUBSCRIBE);
+                    validateTopicOperation(topicName, 
TopicOperation.SUBSCRIBE, subscriptionName);
                     return 
pulsar().getBrokerService().getTopic(topicName.toString(), 
isAllowAutoTopicCreation);
                 }).thenApply(optTopic -> {
             if (optTopic.isPresent()) {
@@ -2687,7 +2687,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
 
         validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
+        validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES, 
subName);
 
         if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
             log.error("[{}] Not supported operation of non-persistent topic {} 
{}", clientAppId(), topicName,
@@ -3575,7 +3575,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
         future.thenCompose(__ ->
                 validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES))
+                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES, subName))
                 .thenCompose(unused2 ->
                         // If the topic name is a partition name, no need to 
get partition topic metadata again
                         getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
@@ -3724,7 +3724,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
 
         future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES, subName))
                 .thenCompose(__ -> {
                     log.info("[{}][{}] received expire messages on 
subscription {} to position {}", clientAppId(),
                             topicName, subName, messageId);

Reply via email to