Demogorgon314 commented on a change in pull request #11354:
URL: https://github.com/apache/pulsar/pull/11354#discussion_r671822251
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1030,34 +1030,35 @@ protected void internalGetSubscriptions(AsyncResponse
asyncResponse, boolean aut
false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
try {
- // get the subscriptions only from the 1st partition
- // since all the other partitions will have the same
- // subscriptions
-
pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
- .whenComplete((r, ex) -> {
- if (ex != null) {
- log.warn("[{}] Failed to get list of
subscriptions for {}: {}", clientAppId(),
- topicName, ex.getMessage());
-
- if (ex instanceof
PulsarAdminException) {
- PulsarAdminException pae =
(PulsarAdminException) ex;
- if (pae.getStatusCode() ==
Status.NOT_FOUND.getStatusCode()) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
- "Internal topics have
not been generated yet"));
- return;
- } else {
- asyncResponse.resume(new
RestException(pae));
- return;
- }
- } else {
- asyncResponse.resume(new
RestException(ex));
- return;
- }
+ final Set<String> subscriptions =
Sets.newConcurrentHashSet();
+ final List<CompletableFuture<Object>>
subscriptionFutures = Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++)
{
+ CompletableFuture<List<String>> subscriptionsAsync
= pulsar().getAdminClient().topics()
+
.getSubscriptionsAsync(topicName.getPartition(i).toString());
+
subscriptionFutures.add(subscriptionsAsync.thenApply(r ->
subscriptions.addAll(r)));
+ }
+
FutureUtil.waitForAll(subscriptionFutures).whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] Failed to get list of
subscriptions for {}: {}", clientAppId(),
+ topicName, ex.getMessage());
+ if (ex instanceof PulsarAdminException) {
Review comment:
In this case maybe throw NotFoundException: Topic partitions were not
yet created
Example:
```
./bin/pulsar-admin topics create-partitioned-topic public/default/nodatahere
-p 3
./bin/pulsar-admin topics create-subscription -s sub_0 -m earliest
persistent://public/default/nodatahere-partition-0
./bin/pulsar-admin topics create-subscription -s sub_1 -m earliest
persistent://public/default/nodatahere-partition-0
./bin/pulsar-admin topics create-subscription -s sub_2 -m earliest
persistent://public/default/nodatahere-partition-1
```
Waiting for brokerDeleteInactiveTopicsFrequencySeconds (default: 60s) or
call PersistentTopic.checkGC() if the deleteMode ==
InactiveTopicDeleteMode.delete_when_no_subscriptions, topic
nodatahere-partition-2 will be delete.
```
./bin/pulsar-admin topics subscriptions
persistent://public/default/nodatahere-partition-2
./bin/pulsar-admin topics subscriptions
persistent://public/default/nodatahere
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]