This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c6c7d6dd41cb2480408df35e29d652282f477587 Author: gaozhangmin <[email protected]> AuthorDate: Thu Apr 21 09:59:37 2022 +0800 Fix duplicate validateTopicOwnershipAsync (#15120) Co-authored-by: gavingaozhangmin <[email protected]> (cherry picked from commit 151f1d1d3e14df9166547d1aed829c774ccce99d) --- .../broker/admin/impl/PersistentTopicsBase.java | 259 ++++++++++----------- 1 file changed, 129 insertions(+), 130 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 3c3f622266c..0b92e1ab981 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1039,11 +1039,13 @@ public class PersistentTopicsBase extends AdminResource { } else { future = CompletableFuture.completedFuture(null); } - future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) - .thenAccept(unused -> { + future.thenCompose(__ -> + validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS) + .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenAccept(unused1 -> { // If the topic name is a partition name, no need to get partition topic metadata again if (topicName.isPartitioned()) { - internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative); + internalGetSubscriptionsForNonPartitionedTopic(asyncResponse); } else { getPartitionedTopicMetadataAsync(topicName, authoritative, false) .thenAccept(partitionMetadata -> { @@ -1061,7 +1063,7 @@ public class PersistentTopicsBase extends AdminResource { topicResources().persistentTopicExists(topicName.getPartition(i))); } FutureUtil.waitForAll(Lists.newArrayList(existsFutures.values())) - .thenApply(__ -> + .thenApply(unused2 -> existsFutures.entrySet().stream().filter(e -> e.getValue().join()) .map(item -> topicName.getPartition(item.getKey()).toString()) .collect(Collectors.toList()) @@ -1080,7 +1082,7 @@ public class PersistentTopicsBase extends AdminResource { throw new RestException(e); } }); - }).thenAccept(__ -> resumeAsyncResponse(asyncResponse, + }).thenAccept(unused3 -> resumeAsyncResponse(asyncResponse, subscriptions, subscriptionFutures)); } else { for (int i = 0; i < partitionMetadata.partitions; i++) { @@ -1098,7 +1100,7 @@ public class PersistentTopicsBase extends AdminResource { asyncResponse.resume(e); } } else { - internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative); + internalGetSubscriptionsForNonPartitionedTopic(asyncResponse); } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. @@ -1118,7 +1120,8 @@ public class PersistentTopicsBase extends AdminResource { } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; - }); + }) + ); } private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscriptions, @@ -1147,10 +1150,8 @@ public class PersistentTopicsBase extends AdminResource { }); } - private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { - validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS)) - .thenCompose(__ -> getTopicReferenceAsync(topicName)) + private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse) { + getTopicReferenceAsync(topicName) .thenAccept(topic -> asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys()))) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. @@ -1725,11 +1726,7 @@ public class PersistentTopicsBase extends AdminResource { private CompletableFuture<Void> internalSkipAllMessagesForNonPartitionedTopicAsync(AsyncResponse asyncResponse, String subName, boolean authoritative) { - return validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> - validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName)) - .thenCompose(__ -> - getTopicReferenceAsync(topicName).thenCompose(t -> { + return getTopicReferenceAsync(topicName).thenCompose(t -> { PersistentTopic topic = (PersistentTopic) t; BiConsumer<Void, Throwable> biConsumer = (v, ex) -> { if (ex != null) { @@ -1758,8 +1755,7 @@ public class PersistentTopicsBase extends AdminResource { } return sub.clearBacklog().whenComplete(biConsumer); } - }) - .exceptionally(ex -> { + }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. if (!isRedirectException(ex)) { log.error("[{}] Failed to skip all messages for subscription {} on topic {}", @@ -1767,7 +1763,7 @@ public class PersistentTopicsBase extends AdminResource { } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; - })); + }); } protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, int numMessages, @@ -1930,7 +1926,7 @@ public class PersistentTopicsBase extends AdminResource { for (int i = 0; i < subNames.size(); i++) { try { futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, - subNames.get(i), expireTimeInSeconds, authoritative)); + subNames.get(i), expireTimeInSeconds)); } catch (Exception e) { log.error("[{}] Failed to expire messages for all subscription up to {} on {}", clientAppId(), expireTimeInSeconds, topicName, e); @@ -3441,61 +3437,68 @@ public class PersistentTopicsBase extends AdminResource { future = CompletableFuture.completedFuture(null); } future.thenCompose(__ -> - // If the topic name is a partition name, no need to get partition topic metadata again - getPartitionedTopicMetadataAsync(topicName, authoritative, false) - .thenCompose(partitionMetadata -> { - if (topicName.isPartitioned()) { - return internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, subName, - expireTimeInSeconds, authoritative) - .thenAccept(unused -> asyncResponse.resume(Response.noContent().build())); - } else { - if (partitionMetadata.partitions > 0) { - return CompletableFuture.completedFuture(null).thenAccept(unused -> { - final List<CompletableFuture<Void>> futures = Lists.newArrayList(); - - // expire messages for each partition topic - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar() - .getAdminClient() - .topics() - .expireMessagesAsync(topicNamePartition.toString(), - subName, expireTimeInSeconds)); - } catch (Exception e) { - log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), - expireTimeInSeconds, topicNamePartition, e); - asyncResponse.resume(new RestException(e)); - return; + validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES) + .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(unused2 -> + // If the topic name is a partition name, no need to get partition topic metadata again + getPartitionedTopicMetadataAsync(topicName, authoritative, false) + .thenCompose(partitionMetadata -> { + if (topicName.isPartitioned()) { + return internalExpireMessagesByTimestampForSinglePartitionAsync + (partitionMetadata, subName, expireTimeInSeconds) + .thenAccept(unused3 -> + asyncResponse.resume(Response.noContent().build())); + } else { + if (partitionMetadata.partitions > 0) { + return CompletableFuture.completedFuture(null).thenAccept(unused -> { + final List<CompletableFuture<Void>> futures = Lists.newArrayList(); + + // expire messages for each partition topic + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar() + .getAdminClient() + .topics() + .expireMessagesAsync(topicNamePartition.toString(), + subName, expireTimeInSeconds)); + } catch (Exception e) { + log.error("[{}] Failed to expire messages up to {} on {}", + clientAppId(), + expireTimeInSeconds, topicNamePartition, e); + asyncResponse.resume(new RestException(e)); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "Subscription not found")); + return null; + } else { + log.error("[{}] Failed to expire messages up " + + "to {} on {}", clientAppId(), + expireTimeInSeconds, topicName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + asyncResponse.resume(Response.noContent().build()); + return null; + }); + }); + } else { + return internalExpireMessagesByTimestampForSinglePartitionAsync + (partitionMetadata, subName, expireTimeInSeconds) + .thenAccept(unused -> + asyncResponse.resume(Response.noContent().build())); } } + })) - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Subscription not found")); - return null; - } else { - log.error("[{}] Failed to expire messages up to {} on {}", - clientAppId(), expireTimeInSeconds, - topicName, t); - asyncResponse.resume(new RestException(t)); - return null; - } - } - asyncResponse.resume(Response.noContent().build()); - return null; - }); - }); - } else { - return internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, - subName, expireTimeInSeconds, authoritative) - .thenAccept(unused -> asyncResponse.resume(Response.noContent().build())); - } - } - }) ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. if (!isRedirectException(ex)) { @@ -3508,69 +3511,65 @@ public class PersistentTopicsBase extends AdminResource { } private CompletableFuture<Void> internalExpireMessagesByTimestampForSinglePartitionAsync( - PartitionedTopicMetadata partitionMetadata, String subName, int expireTimeInSeconds, - boolean authoritative) { + PartitionedTopicMetadata partitionMetadata, String subName, int expireTimeInSeconds) { if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) { String msg = "This method should not be called for partitioned topic"; return FutureUtil.failedFuture(new IllegalStateException(msg)); } else { final CompletableFuture<Void> resultFuture = new CompletableFuture<>(); - validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES) - .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) - .thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(t -> { - if (t == null) { - resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Topic not found")); - return; - } - if (!(t instanceof PersistentTopic)) { - resultFuture.completeExceptionally(new RestException(Status.METHOD_NOT_ALLOWED, - "Expire messages on a non-persistent topic is not allowed")); - return; - } - PersistentTopic topic = (PersistentTopic) t; - - boolean issued; - if (subName.startsWith(topic.getReplicatorPrefix())) { - String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - PersistentReplicator repl = (PersistentReplicator) topic - .getPersistentReplicator(remoteCluster); - if (repl == null) { - resultFuture.completeExceptionally( - new RestException(Status.NOT_FOUND, "Replicator not found")); - return; - } - issued = repl.expireMessages(expireTimeInSeconds); - } else { - PersistentSubscription sub = topic.getSubscription(subName); - if (sub == null) { - resultFuture.completeExceptionally( - new RestException(Status.NOT_FOUND, "Subscription not found")); - return; - } - issued = sub.expireMessages(expireTimeInSeconds); - } - if (issued) { - log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), - expireTimeInSeconds, topicName, subName); - resultFuture.complete(__); - } else { - if (log.isDebugEnabled()) { - log.debug("Expire message by timestamp not issued on topic {} for subscription {} " - + "due to ongoing message expiration not finished or subscription almost" - + " catch up. If it's performed on a partitioned topic operation might " - + "succeeded on other partitions, please check stats of individual " - + "partition.", topicName, subName); - } - resultFuture.completeExceptionally(new RestException(Status.CONFLICT, "Expire message " - + "by timestamp not issued on topic " + topicName + " for subscription " - + subName + " due to ongoing message expiration not finished or subscription " - + "almost catch up. If it's performed on a partitioned topic operation might" - + " succeeded on other partitions, please check stats of individual partition." - )); - return; - } - }) - ).exceptionally(e -> { + getTopicReferenceAsync(topicName).thenAccept(t -> { + if (t == null) { + resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Topic not found")); + return; + } + if (!(t instanceof PersistentTopic)) { + resultFuture.completeExceptionally(new RestException(Status.METHOD_NOT_ALLOWED, + "Expire messages on a non-persistent topic is not allowed")); + return; + } + PersistentTopic topic = (PersistentTopic) t; + + boolean issued; + if (subName.startsWith(topic.getReplicatorPrefix())) { + String remoteCluster = PersistentReplicator.getRemoteCluster(subName); + PersistentReplicator repl = (PersistentReplicator) topic + .getPersistentReplicator(remoteCluster); + if (repl == null) { + resultFuture.completeExceptionally( + new RestException(Status.NOT_FOUND, "Replicator not found")); + return; + } + issued = repl.expireMessages(expireTimeInSeconds); + } else { + PersistentSubscription sub = topic.getSubscription(subName); + if (sub == null) { + resultFuture.completeExceptionally( + new RestException(Status.NOT_FOUND, "Subscription not found")); + return; + } + issued = sub.expireMessages(expireTimeInSeconds); + } + if (issued) { + log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), + expireTimeInSeconds, topicName, subName); + resultFuture.complete(null); + } else { + if (log.isDebugEnabled()) { + log.debug("Expire message by timestamp not issued on topic {} for subscription {} " + + "due to ongoing message expiration not finished or subscription almost" + + " catch up. If it's performed on a partitioned topic operation might " + + "succeeded on other partitions, please check stats of individual " + + "partition.", topicName, subName); + } + resultFuture.completeExceptionally(new RestException(Status.CONFLICT, "Expire message " + + "by timestamp not issued on topic " + topicName + " for subscription " + + subName + " due to ongoing message expiration not finished or subscription " + + "almost catch up. If it's performed on a partitioned topic operation might" + + " succeeded on other partitions, please check stats of individual partition." + )); + return; + } + }).exceptionally(e -> { resultFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e)); return null; });
