This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 807c0a456bc make getList async #16221 (#18811)
807c0a456bc is described below
commit 807c0a456bc31985646f0901e36f2b1e79e1c84f
Author: congbo <[email protected]>
AuthorDate: Thu Dec 8 13:53:12 2022 +0800
make getList async #16221 (#18811)
Co-authored-by: congbobo184 <[email protected]>
---
.../apache/pulsar/broker/admin/AdminResource.java | 17 ++++++++++++++
.../broker/admin/impl/PersistentTopicsBase.java | 26 ++++++++++++++++++----
.../pulsar/broker/admin/v1/PersistentTopics.java | 18 ++++++++-------
.../pulsar/broker/admin/v2/PersistentTopics.java | 18 ++++++++-------
.../org/apache/pulsar/broker/admin/AdminTest.java | 3 ++-
5 files changed, 61 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index c1d846b1571..e78b57aa8c9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -825,4 +825,21 @@ public abstract class AdminResource extends
PulsarWebResource {
persistence.getBookkeeperAckQuorum()));
}
+
+ /**
+ * Check current exception whether is redirect exception.
+ *
+ * @param ex The throwable.
+ * @return Whether is redirect exception
+ */
+ protected static boolean isRedirectException(Throwable ex) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ return realCause instanceof WebApplicationException
+ && ((WebApplicationException)
realCause).getResponse().getStatus()
+ == Status.TEMPORARY_REDIRECT.getStatusCode();
+ }
+
+ protected static String getPartitionedTopicNotFoundErrorMessage(String
topic) {
+ return String.format("Partitioned Topic %s not found", topic);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 61fb3d69c02..0ddcfd0a7a4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -188,6 +188,19 @@ public class PersistentTopicsBase extends AdminResource {
return getPartitionedTopicList(TopicDomain.getEnum(domain()));
}
+ protected CompletableFuture<List<String>> internalGetListAsync() {
+ return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.GET_TOPICS)
+ .thenCompose(__ ->
namespaceResources().namespaceExistsAsync(namespaceName))
+ .thenAccept(exists -> {
+ if (!exists) {
+ throw new RestException(Status.NOT_FOUND, "Namespace
does not exist");
+ }
+ })
+ .thenCompose(__ ->
topicResources().listPersistentTopicsAsync(namespaceName))
+ .thenApply(topics -> topics.stream().filter(topic ->
+
!isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList()));
+ }
+
protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
validateAdminAccessForTenant(namespaceName.getTenant());
@@ -3701,17 +3714,22 @@ public class PersistentTopicsBase extends AdminResource
{
return getPartitionedTopicMetadataAsync(
TopicName.get(topicName.getPartitionedTopicName()), false,
false)
- .thenApply(partitionedTopicMetadata -> {
+ .thenAccept(partitionedTopicMetadata -> {
if (partitionedTopicMetadata == null ||
partitionedTopicMetadata.partitions == 0) {
final String topicErrorType = partitionedTopicMetadata
== null ? "has no metadata" : "has zero
partitions";
throw new RestException(Status.NOT_FOUND,
String.format(
"Partitioned Topic not found: %s %s",
topicName.toString(), topicErrorType));
- } else if
(!internalGetList().contains(topicName.toString())) {
+ }
+ })
+ .thenCompose(__ -> internalGetListAsync())
+ .thenApply(topics -> {
+ if (!topics.contains(topicName.toString())) {
throw new RestException(Status.NOT_FOUND, "Topic
partitions were not yet created");
}
- throw new RestException(Status.NOT_FOUND, "Partitioned
Topic not found");
- });
+ throw new RestException(Status.NOT_FOUND,
+
getPartitionedTopicNotFoundErrorMessage(topicName.toString()));
+ });
}
private Topic getOrCreateTopic(TopicName topicName) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index f3b5e6b56a3..5d12a59b823 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -73,14 +73,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 404, message = "Namespace doesn't exist")})
public void getList(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace")
String namespace) {
- try {
- validateNamespaceName(property, cluster, namespace);
- asyncResponse.resume(internalGetList());
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(property, cluster, namespace);
+ internalGetListAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get topic list {}",
clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 8e178da303c..c9c5ad7720c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -95,14 +95,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace) {
- try {
- validateNamespaceName(tenant, namespace);
- asyncResponse.resume(internalGetList());
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(tenant, namespace);
+ internalGetListAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get topic list {}",
clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 1e825d340c0..339de6e9ac6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -797,8 +797,9 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
.createPolicies(NamespaceName.get(property, cluster,
namespace), new Policies());
AsyncResponse response = mock(AsyncResponse.class);
+
persistentTopics.getList(response, property, cluster, namespace);
- verify(response, times(1)).resume(Lists.newArrayList());
+ verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
// create topic
assertEquals(persistentTopics.getPartitionedTopicList(property,
cluster, namespace), Lists.newArrayList());
response = mock(AsyncResponse.class);