This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c6c7d6dd41cb2480408df35e29d652282f477587
Author: gaozhangmin <[email protected]>
AuthorDate: Thu Apr 21 09:59:37 2022 +0800

    Fix duplicate validateTopicOwnershipAsync (#15120)
    
    Co-authored-by: gavingaozhangmin <[email protected]>
    (cherry picked from commit 151f1d1d3e14df9166547d1aed829c774ccce99d)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 259 ++++++++++-----------
 1 file changed, 129 insertions(+), 130 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3c3f622266c..0b92e1ab981 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1039,11 +1039,13 @@ public class PersistentTopicsBase extends AdminResource 
{
         } else {
             future = CompletableFuture.completedFuture(null);
         }
-        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
-                .thenAccept(unused -> {
+        future.thenCompose(__ ->
+                validateTopicOperationAsync(topicName, 
TopicOperation.GET_SUBSCRIPTIONS)
+                .thenCompose(unused -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenAccept(unused1 -> {
                     // If the topic name is a partition name, no need to get 
partition topic metadata again
                     if (topicName.isPartitioned()) {
-                        
internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
+                        
internalGetSubscriptionsForNonPartitionedTopic(asyncResponse);
                     } else {
                         getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
                                 .thenAccept(partitionMetadata -> {
@@ -1061,7 +1063,7 @@ public class PersistentTopicsBase extends AdminResource {
                                                     
topicResources().persistentTopicExists(topicName.getPartition(i)));
                                         }
                                         
FutureUtil.waitForAll(Lists.newArrayList(existsFutures.values()))
-                                                .thenApply(__ ->
+                                                .thenApply(unused2 ->
                                                 
existsFutures.entrySet().stream().filter(e -> e.getValue().join())
                                                         .map(item -> 
topicName.getPartition(item.getKey()).toString())
                                                         
.collect(Collectors.toList())
@@ -1080,7 +1082,7 @@ public class PersistentTopicsBase extends AdminResource {
                                                     throw new RestException(e);
                                                 }
                                             });
-                                        }).thenAccept(__ -> 
resumeAsyncResponse(asyncResponse,
+                                        }).thenAccept(unused3 -> 
resumeAsyncResponse(asyncResponse,
                                                         subscriptions, 
subscriptionFutures));
                                     } else {
                                         for (int i = 0; i < 
partitionMetadata.partitions; i++) {
@@ -1098,7 +1100,7 @@ public class PersistentTopicsBase extends AdminResource {
                                     asyncResponse.resume(e);
                                 }
                             } else {
-                                
internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
+                                
internalGetSubscriptionsForNonPartitionedTopic(asyncResponse);
                             }
                         }).exceptionally(ex -> {
                             // If the exception is not redirect exception we 
need to log it.
@@ -1118,7 +1120,8 @@ public class PersistentTopicsBase extends AdminResource {
                     }
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
-                });
+                })
+        );
     }
 
     private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> 
subscriptions,
@@ -1147,10 +1150,8 @@ public class PersistentTopicsBase extends AdminResource {
         });
     }
 
-    private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
-        validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.GET_SUBSCRIPTIONS))
-                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+    private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse 
asyncResponse) {
+        getTopicReferenceAsync(topicName)
                 .thenAccept(topic -> 
asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys())))
                 .exceptionally(ex -> {
                     // If the exception is not redirect exception we need to 
log it.
@@ -1725,11 +1726,7 @@ public class PersistentTopicsBase extends AdminResource {
     private CompletableFuture<Void> 
internalSkipAllMessagesForNonPartitionedTopicAsync(AsyncResponse asyncResponse,
                                                                                
        String subName,
                                                                                
        boolean authoritative) {
-        return validateTopicOwnershipAsync(topicName, authoritative)
-            .thenCompose(__ ->
-                validateTopicOperationAsync(topicName, TopicOperation.SKIP, 
subName))
-            .thenCompose(__ ->
-                getTopicReferenceAsync(topicName).thenCompose(t -> {
+        return getTopicReferenceAsync(topicName).thenCompose(t -> {
                     PersistentTopic topic = (PersistentTopic) t;
                     BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
                         if (ex != null) {
@@ -1758,8 +1755,7 @@ public class PersistentTopicsBase extends AdminResource {
                         }
                         return sub.clearBacklog().whenComplete(biConsumer);
                     }
-                })
-                .exceptionally(ex -> {
+                }).exceptionally(ex -> {
                     // If the exception is not redirect exception we need to 
log it.
                     if (!isRedirectException(ex)) {
                         log.error("[{}] Failed to skip all messages for 
subscription {} on topic {}",
@@ -1767,7 +1763,7 @@ public class PersistentTopicsBase extends AdminResource {
                     }
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
-                }));
+                });
     }
 
     protected void internalSkipMessages(AsyncResponse asyncResponse, String 
subName, int numMessages,
@@ -1930,7 +1926,7 @@ public class PersistentTopicsBase extends AdminResource {
                     for (int i = 0; i < subNames.size(); i++) {
                         try {
                             
futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
-                                    subNames.get(i), expireTimeInSeconds, 
authoritative));
+                                    subNames.get(i), expireTimeInSeconds));
                         } catch (Exception e) {
                             log.error("[{}] Failed to expire messages for all 
subscription up to {} on {}",
                                     clientAppId(), expireTimeInSeconds, 
topicName, e);
@@ -3441,61 +3437,68 @@ public class PersistentTopicsBase extends AdminResource 
{
             future = CompletableFuture.completedFuture(null);
         }
         future.thenCompose(__ ->
-            // If the topic name is a partition name, no need to get partition 
topic metadata again
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
-                    .thenCompose(partitionMetadata -> {
-                        if (topicName.isPartitioned()) {
-                            return 
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, 
subName,
-                                    expireTimeInSeconds, authoritative)
-                                    .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
-                        } else {
-                            if (partitionMetadata.partitions > 0) {
-                                return 
CompletableFuture.completedFuture(null).thenAccept(unused -> {
-                                    final List<CompletableFuture<Void>> 
futures = Lists.newArrayList();
-
-                                    // expire messages for each partition topic
-                                    for (int i = 0; i < 
partitionMetadata.partitions; i++) {
-                                        TopicName topicNamePartition = 
topicName.getPartition(i);
-                                        try {
-                                            futures.add(pulsar()
-                                                    .getAdminClient()
-                                                    .topics()
-                                                    
.expireMessagesAsync(topicNamePartition.toString(),
-                                                            subName, 
expireTimeInSeconds));
-                                        } catch (Exception e) {
-                                            log.error("[{}] Failed to expire 
messages up to {} on {}", clientAppId(),
-                                                    expireTimeInSeconds, 
topicNamePartition, e);
-                                            asyncResponse.resume(new 
RestException(e));
-                                            return;
+            validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES)
+                .thenCompose(unused -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(unused2 ->
+                        // If the topic name is a partition name, no need to 
get partition topic metadata again
+                        getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
+                                .thenCompose(partitionMetadata -> {
+                                    if (topicName.isPartitioned()) {
+                                        return 
internalExpireMessagesByTimestampForSinglePartitionAsync
+                                                (partitionMetadata, subName, 
expireTimeInSeconds)
+                                                .thenAccept(unused3 ->
+                                                        
asyncResponse.resume(Response.noContent().build()));
+                                    } else {
+                                        if (partitionMetadata.partitions > 0) {
+                                            return 
CompletableFuture.completedFuture(null).thenAccept(unused -> {
+                                                final 
List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
+                                                // expire messages for each 
partition topic
+                                                for (int i = 0; i < 
partitionMetadata.partitions; i++) {
+                                                    TopicName 
topicNamePartition = topicName.getPartition(i);
+                                                    try {
+                                                        futures.add(pulsar()
+                                                                
.getAdminClient()
+                                                                .topics()
+                                                                
.expireMessagesAsync(topicNamePartition.toString(),
+                                                                        
subName, expireTimeInSeconds));
+                                                    } catch (Exception e) {
+                                                        log.error("[{}] Failed 
to expire messages up to {} on {}",
+                                                                clientAppId(),
+                                                                
expireTimeInSeconds, topicNamePartition, e);
+                                                        
asyncResponse.resume(new RestException(e));
+                                                        return;
+                                                    }
+                                                }
+
+                                                
FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                                                    if (exception != null) {
+                                                        Throwable t = 
exception.getCause();
+                                                        if (t instanceof 
NotFoundException) {
+                                                            
asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                                                    
"Subscription not found"));
+                                                            return null;
+                                                        } else {
+                                                            log.error("[{}] 
Failed to expire messages up "
+                                                                            + 
"to {} on {}", clientAppId(),
+                                                                    
expireTimeInSeconds, topicName, t);
+                                                            
asyncResponse.resume(new RestException(t));
+                                                            return null;
+                                                        }
+                                                    }
+                                                    
asyncResponse.resume(Response.noContent().build());
+                                                    return null;
+                                                });
+                                            });
+                                        } else {
+                                            return 
internalExpireMessagesByTimestampForSinglePartitionAsync
+                                                    (partitionMetadata, 
subName, expireTimeInSeconds)
+                                                    .thenAccept(unused ->
+                                                            
asyncResponse.resume(Response.noContent().build()));
                                         }
                                     }
+                                }))
 
-                                    
FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                                        if (exception != null) {
-                                            Throwable t = exception.getCause();
-                                            if (t instanceof 
NotFoundException) {
-                                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
-                                                        "Subscription not 
found"));
-                                                return null;
-                                            } else {
-                                                log.error("[{}] Failed to 
expire messages up to {} on {}",
-                                                        clientAppId(), 
expireTimeInSeconds,
-                                                        topicName, t);
-                                                asyncResponse.resume(new 
RestException(t));
-                                                return null;
-                                            }
-                                        }
-                                        
asyncResponse.resume(Response.noContent().build());
-                                        return null;
-                                    });
-                                });
-                            } else {
-                                return 
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
-                                        subName, expireTimeInSeconds, 
authoritative)
-                                        .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
-                            }
-                        }
-                    })
         ).exceptionally(ex -> {
             // If the exception is not redirect exception we need to log it.
             if (!isRedirectException(ex)) {
@@ -3508,69 +3511,65 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     private CompletableFuture<Void> 
internalExpireMessagesByTimestampForSinglePartitionAsync(
-            PartitionedTopicMetadata partitionMetadata, String subName, int 
expireTimeInSeconds,
-            boolean authoritative) {
+            PartitionedTopicMetadata partitionMetadata, String subName, int 
expireTimeInSeconds) {
         if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) {
             String msg = "This method should not be called for partitioned 
topic";
             return FutureUtil.failedFuture(new IllegalStateException(msg));
         } else {
             final CompletableFuture<Void> resultFuture = new 
CompletableFuture<>();
-            validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES)
-                    .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
-                    .thenCompose(__ -> 
getTopicReferenceAsync(topicName).thenAccept(t -> {
-                         if (t == null) {
-                             resultFuture.completeExceptionally(new 
RestException(Status.NOT_FOUND, "Topic not found"));
-                             return;
-                         }
-                        if (!(t instanceof PersistentTopic)) {
-                            resultFuture.completeExceptionally(new 
RestException(Status.METHOD_NOT_ALLOWED,
-                                    "Expire messages on a non-persistent topic 
is not allowed"));
-                            return;
-                        }
-                        PersistentTopic topic = (PersistentTopic) t;
-
-                        boolean issued;
-                        if (subName.startsWith(topic.getReplicatorPrefix())) {
-                            String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
-                            PersistentReplicator repl = (PersistentReplicator) 
topic
-                                    .getPersistentReplicator(remoteCluster);
-                            if (repl == null) {
-                                resultFuture.completeExceptionally(
-                                        new RestException(Status.NOT_FOUND, 
"Replicator not found"));
-                                return;
-                            }
-                            issued = repl.expireMessages(expireTimeInSeconds);
-                        } else {
-                            PersistentSubscription sub = 
topic.getSubscription(subName);
-                            if (sub == null) {
-                                resultFuture.completeExceptionally(
-                                        new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                                return;
-                            }
-                            issued = sub.expireMessages(expireTimeInSeconds);
-                        }
-                        if (issued) {
-                            log.info("[{}] Message expire started up to {} on 
{} {}", clientAppId(),
-                                    expireTimeInSeconds, topicName, subName);
-                            resultFuture.complete(__);
-                        } else {
-                            if (log.isDebugEnabled()) {
-                                log.debug("Expire message by timestamp not 
issued on topic {} for subscription {} "
-                                        + "due to ongoing message expiration 
not finished or subscription almost"
-                                        + " catch up. If it's performed on a 
partitioned topic operation might "
-                                        + "succeeded on other partitions, 
please check stats of individual "
-                                        + "partition.", topicName, subName);
-                            }
-                            resultFuture.completeExceptionally(new 
RestException(Status.CONFLICT, "Expire message "
-                                    + "by timestamp not issued on topic " + 
topicName + " for subscription "
-                                    + subName + " due to ongoing message 
expiration not finished or subscription "
-                                    + "almost catch  up. If it's performed on 
a partitioned topic operation might"
-                                    + " succeeded on other partitions, please 
check stats of individual partition."
-                            ));
-                            return;
-                        }
-                            })
-                    ).exceptionally(e -> {
+            getTopicReferenceAsync(topicName).thenAccept(t -> {
+                 if (t == null) {
+                     resultFuture.completeExceptionally(new 
RestException(Status.NOT_FOUND, "Topic not found"));
+                     return;
+                 }
+                if (!(t instanceof PersistentTopic)) {
+                    resultFuture.completeExceptionally(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                            "Expire messages on a non-persistent topic is not 
allowed"));
+                    return;
+                }
+                PersistentTopic topic = (PersistentTopic) t;
+
+                boolean issued;
+                if (subName.startsWith(topic.getReplicatorPrefix())) {
+                    String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
+                    PersistentReplicator repl = (PersistentReplicator) topic
+                            .getPersistentReplicator(remoteCluster);
+                    if (repl == null) {
+                        resultFuture.completeExceptionally(
+                                new RestException(Status.NOT_FOUND, 
"Replicator not found"));
+                        return;
+                    }
+                    issued = repl.expireMessages(expireTimeInSeconds);
+                } else {
+                    PersistentSubscription sub = 
topic.getSubscription(subName);
+                    if (sub == null) {
+                        resultFuture.completeExceptionally(
+                                new RestException(Status.NOT_FOUND, 
"Subscription not found"));
+                        return;
+                    }
+                    issued = sub.expireMessages(expireTimeInSeconds);
+                }
+                if (issued) {
+                    log.info("[{}] Message expire started up to {} on {} {}", 
clientAppId(),
+                            expireTimeInSeconds, topicName, subName);
+                    resultFuture.complete(null);
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Expire message by timestamp not issued on 
topic {} for subscription {} "
+                                + "due to ongoing message expiration not 
finished or subscription almost"
+                                + " catch up. If it's performed on a 
partitioned topic operation might "
+                                + "succeeded on other partitions, please check 
stats of individual "
+                                + "partition.", topicName, subName);
+                    }
+                    resultFuture.completeExceptionally(new 
RestException(Status.CONFLICT, "Expire message "
+                            + "by timestamp not issued on topic " + topicName 
+ " for subscription "
+                            + subName + " due to ongoing message expiration 
not finished or subscription "
+                            + "almost catch  up. If it's performed on a 
partitioned topic operation might"
+                            + " succeeded on other partitions, please check 
stats of individual partition."
+                    ));
+                    return;
+                }
+            }).exceptionally(e -> {
                 
resultFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
                 return null;
             });

Reply via email to