This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 609a58695a822ff8ed209b9b765f44bc42d093a0
Author: GuoJiwei <[email protected]>
AuthorDate: Sat Jul 24 04:56:26 2021 +0800

    [Issue 11339] Pulsar Admin List Subscription lists only subscriptions 
created for Partition-0 when partition specific subscriptions are created 
(#11355)
    
    Fix #11339.
    
    Documentation
    This is a bug fix, no need documentation.
    
    (cherry picked from commit a60c189c9b347f2b63176511048308d921bb8aab)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 87 +++++++++++++++-------
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 22 +++++-
 2 files changed, 82 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 53c647a..781de11 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
@@ -1027,34 +1028,42 @@ public class PersistentTopicsBase extends AdminResource 
{
                     false).thenAccept(partitionMetadata -> {
                 if (partitionMetadata.partitions > 0) {
                     try {
-                        // get the subscriptions only from the 1st partition
-                        // since all the other partitions will have the same
-                        // subscriptions
-                        
pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
-                                .whenComplete((r, ex) -> {
-                                    if (ex != null) {
-                                        log.warn("[{}] Failed to get list of 
subscriptions for {}: {}", clientAppId(),
-                                                topicName, ex.getMessage());
-
-                                        if (ex instanceof 
PulsarAdminException) {
-                                            PulsarAdminException pae = 
(PulsarAdminException) ex;
-                                            if (pae.getStatusCode() == 
Status.NOT_FOUND.getStatusCode()) {
-                                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
-                                                        "Internal topics have 
not been generated yet"));
-                                                return;
-                                            } else {
-                                                asyncResponse.resume(new 
RestException(pae));
-                                                return;
-                                            }
-                                        } else {
-                                            asyncResponse.resume(new 
RestException(ex));
-                                            return;
-                                        }
+                        final Set<String> subscriptions = 
Sets.newConcurrentHashSet();
+                        final List<CompletableFuture<Object>> 
subscriptionFutures = Lists.newArrayList();
+                        if (topicName.getDomain() == TopicDomain.persistent) {
+                            final Map<Integer, CompletableFuture<Boolean>> 
existsFutures = Maps.newConcurrentMap();
+                            for (int i = 0; i < partitionMetadata.partitions; 
i++) {
+                                String path = 
String.format("/managed-ledgers/%s/%s/%s", namespaceName.toString(),
+                                        domain(), 
topicName.getPartition(i).getEncodedLocalName());
+                                CompletableFuture<Boolean> exists = 
getLocalPolicies().existsAsync(path);
+                                existsFutures.put(i, exists);
+                            }
+                            
FutureUtil.waitForAll(Lists.newArrayList(existsFutures.values())).thenApply(__ 
->
+                                    existsFutures.entrySet().stream().filter(e 
-> e.getValue().join().booleanValue())
+                                            .map(item -> 
topicName.getPartition(item.getKey()).toString())
+                                            .collect(Collectors.toList())
+                            ).thenAccept(topics -> {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("activeTopics : {}", topics);
+                                }
+                                topics.forEach(topic -> {
+                                    try {
+                                        CompletableFuture<List<String>> 
subscriptionsAsync = pulsar().getAdminClient()
+                                                
.topics().getSubscriptionsAsync(topic);
+                                        
subscriptionFutures.add(subscriptionsAsync.thenApply(subscriptions::addAll));
+                                    } catch (PulsarServerException e) {
+                                        throw new RestException(e);
                                     }
-                                    final List<String> subscriptions = 
Lists.newArrayList();
-                                    subscriptions.addAll(r);
-                                    asyncResponse.resume(subscriptions);
                                 });
+                            }).thenAccept(__ -> 
resumeAsyncResponse(asyncResponse, subscriptions, subscriptionFutures));
+                        } else {
+                            for (int i = 0; i < partitionMetadata.partitions; 
i++) {
+                                CompletableFuture<List<String>> 
subscriptionsAsync = pulsar().getAdminClient().topics()
+                                        
.getSubscriptionsAsync(topicName.getPartition(i).toString());
+                                
subscriptionFutures.add(subscriptionsAsync.thenApply(subscriptions::addAll));
+                            }
+                            resumeAsyncResponse(asyncResponse, subscriptions, 
subscriptionFutures);
+                        }
                     } catch (Exception e) {
                         log.error("[{}] Failed to get list of subscriptions 
for {}", clientAppId(), topicName, e);
                         asyncResponse.resume(e);
@@ -1070,6 +1079,32 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> 
subscriptions,
+                                     List<CompletableFuture<Object>> 
subscriptionFutures) {
+        FutureUtil.waitForAll(subscriptionFutures).whenComplete((r, ex) -> {
+            if (ex != null) {
+                log.warn("[{}] Failed to get list of subscriptions for {}: 
{}", clientAppId(),
+                        topicName, ex.getMessage());
+                if (ex instanceof PulsarAdminException) {
+                    PulsarAdminException pae = (PulsarAdminException) ex;
+                    if (pae.getStatusCode() == 
Status.NOT_FOUND.getStatusCode()) {
+                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                "Internal topics have not been generated 
yet"));
+                        return;
+                    } else {
+                        asyncResponse.resume(new RestException(pae));
+                        return;
+                    }
+                } else {
+                    asyncResponse.resume(new RestException(ex));
+                    return;
+                }
+            } else {
+                asyncResponse.resume(new ArrayList<>(subscriptions));
+            }
+        });
+    }
+
     private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
         try {
             validateTopicOwnership(topicName, authoritative);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index ab5604e..12b9529 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -201,7 +201,27 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
                 true);
         verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
 
-        // 8) Delete the partitioned topic
+        // 8) Create a sub of partitioned-topic
+        response = mock(AsyncResponse.class);
+        persistentTopics.createSubscription(response, testTenant, 
testNamespace, testLocalTopicName + "-partition-1", "test", true,
+                (MessageIdImpl) MessageId.earliest, false);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
+        //
+        response = mock(AsyncResponse.class);
+        persistentTopics.getSubscriptions(response, testTenant, testNamespace, 
testLocalTopicName + "-partition-1", true);
+        verify(response, 
timeout(5000).times(1)).resume(Lists.newArrayList("test"));
+        //
+        response = mock(AsyncResponse.class);
+        persistentTopics.getSubscriptions(response, testTenant, testNamespace, 
testLocalTopicName + "-partition-0", true);
+        verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
+        //
+        response = mock(AsyncResponse.class);
+        persistentTopics.getSubscriptions(response, testTenant, testNamespace, 
testLocalTopicName, true);
+        verify(response, 
timeout(5000).times(1)).resume(Lists.newArrayList("test"));
+
+        // 9) Delete the partitioned topic
         response = mock(AsyncResponse.class);
         persistentTopics.deletePartitionedTopic(response, testTenant, 
testNamespace, testLocalTopicName, true, true, false);
         responseCaptor = ArgumentCaptor.forClass(Response.class);

Reply via email to