This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 609a58695a822ff8ed209b9b765f44bc42d093a0 Author: GuoJiwei <[email protected]> AuthorDate: Sat Jul 24 04:56:26 2021 +0800 [Issue 11339] Pulsar Admin List Subscription lists only subscriptions created for Partition-0 when partition specific subscriptions are created (#11355) Fix #11339. Documentation This is a bug fix, no need documentation. (cherry picked from commit a60c189c9b347f2b63176511048308d921bb8aab) --- .../broker/admin/impl/PersistentTopicsBase.java | 87 +++++++++++++++------- .../pulsar/broker/admin/PersistentTopicsTest.java | 22 +++++- 2 files changed, 82 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 53c647a..781de11 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.stream.Collectors; import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Response; @@ -1027,34 +1028,42 @@ public class PersistentTopicsBase extends AdminResource { false).thenAccept(partitionMetadata -> { if (partitionMetadata.partitions > 0) { try { - // get the subscriptions only from the 1st partition - // since all the other partitions will have the same - // subscriptions - pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString()) - .whenComplete((r, ex) -> { - if (ex != null) { - log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(), - topicName, ex.getMessage()); - - if (ex instanceof PulsarAdminException) { - PulsarAdminException pae = (PulsarAdminException) ex; - if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Internal topics have not been generated yet")); - return; - } else { - asyncResponse.resume(new RestException(pae)); - return; - } - } else { - asyncResponse.resume(new RestException(ex)); - return; - } + final Set<String> subscriptions = Sets.newConcurrentHashSet(); + final List<CompletableFuture<Object>> subscriptionFutures = Lists.newArrayList(); + if (topicName.getDomain() == TopicDomain.persistent) { + final Map<Integer, CompletableFuture<Boolean>> existsFutures = Maps.newConcurrentMap(); + for (int i = 0; i < partitionMetadata.partitions; i++) { + String path = String.format("/managed-ledgers/%s/%s/%s", namespaceName.toString(), + domain(), topicName.getPartition(i).getEncodedLocalName()); + CompletableFuture<Boolean> exists = getLocalPolicies().existsAsync(path); + existsFutures.put(i, exists); + } + FutureUtil.waitForAll(Lists.newArrayList(existsFutures.values())).thenApply(__ -> + existsFutures.entrySet().stream().filter(e -> e.getValue().join().booleanValue()) + .map(item -> topicName.getPartition(item.getKey()).toString()) + .collect(Collectors.toList()) + ).thenAccept(topics -> { + if (log.isDebugEnabled()) { + log.debug("activeTopics : {}", topics); + } + topics.forEach(topic -> { + try { + CompletableFuture<List<String>> subscriptionsAsync = pulsar().getAdminClient() + .topics().getSubscriptionsAsync(topic); + subscriptionFutures.add(subscriptionsAsync.thenApply(subscriptions::addAll)); + } catch (PulsarServerException e) { + throw new RestException(e); } - final List<String> subscriptions = Lists.newArrayList(); - subscriptions.addAll(r); - asyncResponse.resume(subscriptions); }); + }).thenAccept(__ -> resumeAsyncResponse(asyncResponse, subscriptions, subscriptionFutures)); + } else { + for (int i = 0; i < partitionMetadata.partitions; i++) { + CompletableFuture<List<String>> subscriptionsAsync = pulsar().getAdminClient().topics() + .getSubscriptionsAsync(topicName.getPartition(i).toString()); + subscriptionFutures.add(subscriptionsAsync.thenApply(subscriptions::addAll)); + } + resumeAsyncResponse(asyncResponse, subscriptions, subscriptionFutures); + } } catch (Exception e) { log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e); asyncResponse.resume(e); @@ -1070,6 +1079,32 @@ public class PersistentTopicsBase extends AdminResource { } } + private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscriptions, + List<CompletableFuture<Object>> subscriptionFutures) { + FutureUtil.waitForAll(subscriptionFutures).whenComplete((r, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(), + topicName, ex.getMessage()); + if (ex instanceof PulsarAdminException) { + PulsarAdminException pae = (PulsarAdminException) ex; + if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "Internal topics have not been generated yet")); + return; + } else { + asyncResponse.resume(new RestException(pae)); + return; + } + } else { + asyncResponse.resume(new RestException(ex)); + return; + } + } else { + asyncResponse.resume(new ArrayList<>(subscriptions)); + } + }); + } + private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { try { validateTopicOwnership(topicName, authoritative); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index ab5604e..12b9529 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -201,7 +201,27 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { true); verify(response, timeout(5000).times(1)).resume(Lists.newArrayList()); - // 8) Delete the partitioned topic + // 8) Create a sub of partitioned-topic + response = mock(AsyncResponse.class); + persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName + "-partition-1", "test", true, + (MessageIdImpl) MessageId.earliest, false); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + // + response = mock(AsyncResponse.class); + persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-1", true); + verify(response, timeout(5000).times(1)).resume(Lists.newArrayList("test")); + // + response = mock(AsyncResponse.class); + persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", true); + verify(response, timeout(5000).times(1)).resume(Lists.newArrayList()); + // + response = mock(AsyncResponse.class); + persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName, true); + verify(response, timeout(5000).times(1)).resume(Lists.newArrayList("test")); + + // 9) Delete the partitioned topic response = mock(AsyncResponse.class); persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, true, false); responseCaptor = ArgumentCaptor.forClass(Response.class);
