This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 90caa1c621a93c51a12e6d29e0ae213a985174e7
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Apr 22 19:46:11 2022 +0800

    Put `validateTopicOwnershipAsync` before `validateTopicOperationAsync` 
(#15265)
    
    (cherry picked from commit 41f40f06c4c4d74939bca07a9b83bda020147346)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 33 +++++++++++-----------
 1 file changed, 17 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0b92e1ab981..b286a6000dc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -572,8 +572,9 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, 
boolean authoritative,
                                                   boolean force, boolean 
deleteSchema) {
-        validateNamespaceOperationAsync(topicName.getNamespaceObject(), 
NamespaceOperation.DELETE_TOPIC)
-                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> 
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+                        NamespaceOperation.DELETE_TOPIC))
                 .thenCompose(__ -> pulsar().getBrokerService()
                         .fetchPartitionedTopicMetadataAsync(topicName)
                         .thenCompose(partitionedMeta -> {
@@ -963,8 +964,8 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     private void internalUnloadNonPartitionedTopicAsync(AsyncResponse 
asyncResponse, boolean authoritative) {
-        validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
-                .thenCompose(unused -> validateTopicOwnershipAsync(topicName, 
authoritative)
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.UNLOAD)
                         .thenCompose(__ -> getTopicReferenceAsync(topicName))
                         .thenCompose(topic -> topic.close(false))
                         .thenRun(() -> {
@@ -982,8 +983,8 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     private void internalUnloadTransactionCoordinatorAsync(AsyncResponse 
asyncResponse, boolean authoritative) {
-        validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
-                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.UNLOAD)
                         .thenCompose(v -> pulsar()
                                 .getTransactionMetadataStoreService()
                                 .removeTransactionMetadataStore(
@@ -1040,8 +1041,8 @@ public class PersistentTopicsBase extends AdminResource {
             future = CompletableFuture.completedFuture(null);
         }
         future.thenCompose(__ ->
-                validateTopicOperationAsync(topicName, 
TopicOperation.GET_SUBSCRIPTIONS)
-                .thenCompose(unused -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.GET_SUBSCRIPTIONS))
                 .thenAccept(unused1 -> {
                     // If the topic name is a partition name, no need to get 
partition topic metadata again
                     if (topicName.isPartitioned()) {
@@ -1774,8 +1775,8 @@ public class PersistentTopicsBase extends AdminResource {
         } else {
             future = CompletableFuture.completedFuture(null);
         }
-        future.thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.SKIP))
-                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.SKIP))
                 .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
                      .thenCompose(partitionMetadata -> {
                          if (partitionMetadata.partitions > 0) {
@@ -1902,8 +1903,8 @@ public class PersistentTopicsBase extends AdminResource {
                                                                                
  int expireTimeInSeconds,
                                                                                
  boolean authoritative) {
         // validate ownership and redirect if current broker is not owner
-        validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)
-                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES))
                 .thenCompose(__ -> 
getTopicReferenceAsync(topicName).thenAccept(t -> {
                      if (t == null) {
                          resumeAsyncResponseExceptionally(asyncResponse, new 
RestException(Status.NOT_FOUND,
@@ -3437,8 +3438,8 @@ public class PersistentTopicsBase extends AdminResource {
             future = CompletableFuture.completedFuture(null);
         }
         future.thenCompose(__ ->
-            validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES)
-                .thenCompose(unused -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES))
                 .thenCompose(unused2 ->
                         // If the topic name is a partition name, no need to 
get partition topic metadata again
                         getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
@@ -3586,8 +3587,8 @@ public class PersistentTopicsBase extends AdminResource {
             future = CompletableFuture.completedFuture(null);
         }
 
-        future.thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES))
-                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES))
                 .thenCompose(__ -> {
                     log.info("[{}][{}] received expire messages on 
subscription {} to position {}", clientAppId(),
                             topicName, subName, messageId);

Reply via email to