This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 807c0a456bc make getList async #16221 (#18811)
807c0a456bc is described below

commit 807c0a456bc31985646f0901e36f2b1e79e1c84f
Author: congbo <[email protected]>
AuthorDate: Thu Dec 8 13:53:12 2022 +0800

    make getList async #16221 (#18811)
    
    Co-authored-by: congbobo184 <[email protected]>
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 17 ++++++++++++++
 .../broker/admin/impl/PersistentTopicsBase.java    | 26 ++++++++++++++++++----
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 18 ++++++++-------
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 18 ++++++++-------
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  3 ++-
 5 files changed, 61 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index c1d846b1571..e78b57aa8c9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -825,4 +825,21 @@ public abstract class AdminResource extends 
PulsarWebResource {
                         persistence.getBookkeeperAckQuorum()));
 
     }
+
+    /**
+     * Check current exception whether is redirect exception.
+     *
+     * @param ex The throwable.
+     * @return Whether is redirect exception
+     */
+    protected static boolean isRedirectException(Throwable ex) {
+        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+        return realCause instanceof WebApplicationException
+                && ((WebApplicationException) 
realCause).getResponse().getStatus()
+                == Status.TEMPORARY_REDIRECT.getStatusCode();
+    }
+
+    protected static String getPartitionedTopicNotFoundErrorMessage(String 
topic) {
+        return String.format("Partitioned Topic %s not found", topic);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 61fb3d69c02..0ddcfd0a7a4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -188,6 +188,19 @@ public class PersistentTopicsBase extends AdminResource {
         return getPartitionedTopicList(TopicDomain.getEnum(domain()));
     }
 
+    protected CompletableFuture<List<String>> internalGetListAsync() {
+        return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.GET_TOPICS)
+                .thenCompose(__ -> 
namespaceResources().namespaceExistsAsync(namespaceName))
+                .thenAccept(exists -> {
+                    if (!exists) {
+                        throw new RestException(Status.NOT_FOUND, "Namespace 
does not exist");
+                    }
+                })
+                .thenCompose(__ -> 
topicResources().listPersistentTopicsAsync(namespaceName))
+                .thenApply(topics -> topics.stream().filter(topic ->
+                        
!isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList()));
+    }
+
     protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
@@ -3701,17 +3714,22 @@ public class PersistentTopicsBase extends AdminResource 
{
 
         return getPartitionedTopicMetadataAsync(
                 TopicName.get(topicName.getPartitionedTopicName()), false, 
false)
-                .thenApply(partitionedTopicMetadata -> {
+                .thenAccept(partitionedTopicMetadata -> {
                     if (partitionedTopicMetadata == null || 
partitionedTopicMetadata.partitions == 0) {
                         final String topicErrorType = partitionedTopicMetadata
                                 == null ? "has no metadata" : "has zero 
partitions";
                         throw new RestException(Status.NOT_FOUND, 
String.format(
                                 "Partitioned Topic not found: %s %s", 
topicName.toString(), topicErrorType));
-                    } else if 
(!internalGetList().contains(topicName.toString())) {
+                    }
+                })
+                .thenCompose(__ -> internalGetListAsync())
+                .thenApply(topics -> {
+                    if (!topics.contains(topicName.toString())) {
                         throw new RestException(Status.NOT_FOUND, "Topic 
partitions were not yet created");
                     }
-                    throw new RestException(Status.NOT_FOUND, "Partitioned 
Topic not found");
-                });
+                    throw new RestException(Status.NOT_FOUND,
+                        
getPartitionedTopicNotFoundErrorMessage(topicName.toString()));
+            });
     }
 
     private Topic getOrCreateTopic(TopicName topicName) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index f3b5e6b56a3..5d12a59b823 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -73,14 +73,16 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 404, message = "Namespace doesn't exist")})
     public void getList(@Suspended final AsyncResponse asyncResponse, 
@PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace) {
-        try {
-            validateNamespaceName(property, cluster, namespace);
-            asyncResponse.resume(internalGetList());
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(property, cluster, namespace);
+        internalGetListAsync()
+            .thenAccept(asyncResponse::resume)
+            .exceptionally(ex -> {
+                if (!isRedirectException(ex)) {
+                    log.error("[{}] Failed to get topic list {}", 
clientAppId(), namespaceName, ex);
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 8e178da303c..c9c5ad7720c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -95,14 +95,16 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
             @PathParam("namespace") String namespace) {
-        try {
-            validateNamespaceName(tenant, namespace);
-            asyncResponse.resume(internalGetList());
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(tenant, namespace);
+        internalGetListAsync()
+            .thenAccept(asyncResponse::resume)
+            .exceptionally(ex -> {
+                if (!isRedirectException(ex)) {
+                    log.error("[{}] Failed to get topic list {}", 
clientAppId(), namespaceName, ex);
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 1e825d340c0..339de6e9ac6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -797,8 +797,9 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 .createPolicies(NamespaceName.get(property, cluster, 
namespace), new Policies());
 
         AsyncResponse response = mock(AsyncResponse.class);
+
         persistentTopics.getList(response, property, cluster, namespace);
-        verify(response, times(1)).resume(Lists.newArrayList());
+        verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
         // create topic
         assertEquals(persistentTopics.getPartitionedTopicList(property, 
cluster, namespace), Lists.newArrayList());
         response = mock(AsyncResponse.class);

Reply via email to