This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 90caa1c621a93c51a12e6d29e0ae213a985174e7 Author: Jiwei Guo <[email protected]> AuthorDate: Fri Apr 22 19:46:11 2022 +0800 Put `validateTopicOwnershipAsync` before `validateTopicOperationAsync` (#15265) (cherry picked from commit 41f40f06c4c4d74939bca07a9b83bda020147346) --- .../broker/admin/impl/PersistentTopicsBase.java | 33 +++++++++++----------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 0b92e1ab981..b286a6000dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -572,8 +572,9 @@ public class PersistentTopicsBase extends AdminResource { protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force, boolean deleteSchema) { - validateNamespaceOperationAsync(topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC) - .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) + validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateNamespaceOperationAsync(topicName.getNamespaceObject(), + NamespaceOperation.DELETE_TOPIC)) .thenCompose(__ -> pulsar().getBrokerService() .fetchPartitionedTopicMetadataAsync(topicName) .thenCompose(partitionedMeta -> { @@ -963,8 +964,8 @@ public class PersistentTopicsBase extends AdminResource { } private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse, boolean authoritative) { - validateTopicOperationAsync(topicName, TopicOperation.UNLOAD) - .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative) + validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenCompose(topic -> topic.close(false)) .thenRun(() -> { @@ -982,8 +983,8 @@ public class PersistentTopicsBase extends AdminResource { } private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncResponse, boolean authoritative) { - validateTopicOperationAsync(topicName, TopicOperation.UNLOAD) - .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative) + validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD) .thenCompose(v -> pulsar() .getTransactionMetadataStoreService() .removeTransactionMetadataStore( @@ -1040,8 +1041,8 @@ public class PersistentTopicsBase extends AdminResource { future = CompletableFuture.completedFuture(null); } future.thenCompose(__ -> - validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS) - .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative)) + validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS)) .thenAccept(unused1 -> { // If the topic name is a partition name, no need to get partition topic metadata again if (topicName.isPartitioned()) { @@ -1774,8 +1775,8 @@ public class PersistentTopicsBase extends AdminResource { } else { future = CompletableFuture.completedFuture(null); } - future.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP)) - .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) + future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP)) .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false) .thenCompose(partitionMetadata -> { if (partitionMetadata.partitions > 0) { @@ -1902,8 +1903,8 @@ public class PersistentTopicsBase extends AdminResource { int expireTimeInSeconds, boolean authoritative) { // validate ownership and redirect if current broker is not owner - validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES) - .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) + validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)) .thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(t -> { if (t == null) { resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.NOT_FOUND, @@ -3437,8 +3438,8 @@ public class PersistentTopicsBase extends AdminResource { future = CompletableFuture.completedFuture(null); } future.thenCompose(__ -> - validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES) - .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative)) + validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)) .thenCompose(unused2 -> // If the topic name is a partition name, no need to get partition topic metadata again getPartitionedTopicMetadataAsync(topicName, authoritative, false) @@ -3586,8 +3587,8 @@ public class PersistentTopicsBase extends AdminResource { future = CompletableFuture.completedFuture(null); } - future.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)) - .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) + future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)) .thenCompose(__ -> { log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(), topicName, subName, messageId);
