This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 96dbfbfa498 [cherry-pick][branch-2.10] make getList async (#18819)
96dbfbfa498 is described below

commit 96dbfbfa498d61eb6ce9f89c77090dbe8658fab5
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Dec 9 18:35:20 2022 +0800

    [cherry-pick][branch-2.10] make getList async (#18819)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 35 ++++++++++++++++++++--
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 18 ++++++-----
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 18 ++++++-----
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  2 +-
 .../pulsar/broker/service/BrokerServiceTest.java   |  2 ++
 .../systopic/PartitionedSystemTopicTest.java       |  3 +-
 6 files changed, 57 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 698da80265d..4fa7ff79bfe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -185,6 +185,32 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected CompletableFuture<List<String>> 
internalGetListAsync(Optional<String> bundle) {
+        return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.GET_TOPICS)
+            .thenCompose(__ -> 
namespaceResources().namespaceExistsAsync(namespaceName))
+            .thenAccept(exists -> {
+                if (!exists) {
+                    throw new RestException(Status.NOT_FOUND, "Namespace does 
not exist");
+                }
+            })
+            .thenCompose(__ -> 
topicResources().listPersistentTopicsAsync(namespaceName))
+            .thenApply(topics ->
+                topics.stream()
+                    .filter(topic -> {
+                        if (isTransactionInternalName(TopicName.get(topic))) {
+                            return false;
+                        }
+                        if (bundle.isPresent()) {
+                            NamespaceBundle b = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
+                                .getBundle(TopicName.get(topic));
+                            return b != null && 
bundle.get().equals(b.getBundleRange());
+                        }
+                        return true;
+                    })
+                    .collect(Collectors.toList())
+            );
+    }
+
     protected List<String> internalGetPartitionedTopicList() {
         validateNamespaceOperation(namespaceName, 
NamespaceOperation.GET_TOPICS);
         // Validate that namespace exists, throws 404 if it doesn't exist
@@ -203,6 +229,7 @@ public class PersistentTopicsBase extends AdminResource {
         return getPartitionedTopicList(TopicDomain.getEnum(domain()));
     }
 
+
     protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
@@ -4097,13 +4124,17 @@ public class PersistentTopicsBase extends AdminResource 
{
 
         return getPartitionedTopicMetadataAsync(
                 TopicName.get(topicName.getPartitionedTopicName()), false, 
false)
-                .thenApply(partitionedTopicMetadata -> {
+                .thenAccept(partitionedTopicMetadata -> {
                     if (partitionedTopicMetadata == null || 
partitionedTopicMetadata.partitions == 0) {
                         final String topicErrorType = partitionedTopicMetadata
                                 == null ? "has no metadata" : "has zero 
partitions";
                         throw new RestException(Status.NOT_FOUND, 
String.format(
                                 "Partitioned Topic not found: %s %s", 
topicName.toString(), topicErrorType));
-                    } else if 
(!internalGetList(Optional.empty()).contains(topicName.toString())) {
+                    }
+                })
+                .thenCompose(__ -> internalGetListAsync(Optional.empty()))
+                .thenApply(topics -> {
+                    if (!topics.contains(topicName.toString())) {
                         throw new RestException(Status.NOT_FOUND, "Topic 
partitions were not yet created");
                     }
                     throw new RestException(Status.NOT_FOUND, "Partitioned 
Topic not found");
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index ba3a69cc0be..2009de113d2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -77,14 +77,16 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
             @ApiParam(value = "Specify the bundle name", required = false)
             @QueryParam("bundle") String bundle) {
-        try {
-            validateNamespaceName(property, cluster, namespace);
-            asyncResponse.resume(internalGetList(Optional.ofNullable(bundle)));
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(property, cluster, namespace);
+        internalGetListAsync(Optional.ofNullable(bundle))
+            .thenAccept(asyncResponse::resume)
+            .exceptionally(ex -> {
+                if (!isRedirectException(ex)) {
+                    log.error("[{}] Failed to get topic list {}", 
clientAppId(), namespaceName, ex);
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 56e3799c475..a607db8b6aa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -101,14 +101,16 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify the bundle name", required = false)
             @QueryParam("bundle") String bundle) {
-        try {
-            validateNamespaceName(tenant, namespace);
-            asyncResponse.resume(internalGetList(Optional.ofNullable(bundle)));
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(tenant, namespace);
+        internalGetListAsync(Optional.ofNullable(bundle))
+            .thenAccept(asyncResponse::resume)
+            .exceptionally(ex -> {
+                if (!isRedirectException(ex)) {
+                    log.error("[{}] Failed to get topic list {}", 
clientAppId(), namespaceName, ex);
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index f3802235686..cdca5f33a5b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -846,7 +846,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
         AsyncResponse response = mock(AsyncResponse.class);
         persistentTopics.getList(response, property, cluster, namespace, null);
-        verify(response, times(1)).resume(Lists.newArrayList());
+        verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
         // create topic
         assertEquals(persistentTopics.getPartitionedTopicList(property, 
cluster, namespace), Lists.newArrayList());
         response = mock(AsyncResponse.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index e479519f393..89cc626b262 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -86,11 +86,13 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index d8caccfd53d..9920a097e34 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.systopic;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
@@ -261,7 +260,7 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = 
systemTopicClientForNamespace.newWriterAsync();
         CompletableFuture<Void> f1 = 
admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);
 
-        FutureUtil.waitForAll(Lists.newArrayList(writer1, writer2, f1)).join();
+        CompletableFuture.allOf(writer1, writer2, f1).join();
         Assert.assertTrue(reader1.hasMoreEvents());
         Assert.assertNotNull(reader1.readNext());
         Assert.assertTrue(reader2.hasMoreEvents());

Reply via email to