This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 92d229db95c [improve][broker][PIP-149]Make getList async (#16221)
92d229db95c is described below
commit 92d229db95c61ef70861f18b3cb91b0c3963a3ce
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Thu Jun 30 10:42:17 2022 +0800
[improve][broker][PIP-149]Make getList async (#16221)
---
.../broker/admin/impl/PersistentTopicsBase.java | 38 +++++++++++++++++++---
.../pulsar/broker/admin/v1/PersistentTopics.java | 18 +++++-----
.../pulsar/broker/admin/v2/PersistentTopics.java | 18 +++++-----
.../org/apache/pulsar/broker/admin/AdminTest.java | 2 +-
4 files changed, 55 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4c5c4a836a7..5815e31d535 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -183,6 +183,32 @@ public class PersistentTopicsBase extends AdminResource {
}
}
+ protected CompletableFuture<List<String>>
internalGetListAsync(Optional<String> bundle) {
+ return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.GET_TOPICS)
+ .thenCompose(__ ->
namespaceResources().namespaceExistsAsync(namespaceName))
+ .thenAccept(exists -> {
+ if (!exists) {
+ throw new RestException(Status.NOT_FOUND, "Namespace does
not exist");
+ }
+ })
+ .thenCompose(__ ->
topicResources().listPersistentTopicsAsync(namespaceName))
+ .thenApply(topics ->
+ topics.stream()
+ .filter(topic -> {
+ if (isTransactionInternalName(TopicName.get(topic))) {
+ return false;
+ }
+ if (bundle.isPresent()) {
+ NamespaceBundle b =
pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundle(TopicName.get(topic));
+ return b != null &&
bundle.get().equals(b.getBundleRange());
+ }
+ return true;
+ })
+ .collect(Collectors.toList())
+ );
+ }
+
protected CompletableFuture<List<String>> internalGetListAsync() {
return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.GET_TOPICS)
.thenCompose(__ ->
namespaceResources().namespaceExistsAsync(namespaceName))
@@ -4249,18 +4275,22 @@ public class PersistentTopicsBase extends AdminResource
{
return getPartitionedTopicMetadataAsync(
TopicName.get(topicName.getPartitionedTopicName()), false,
false)
- .thenApply(partitionedTopicMetadata -> {
+ .thenAccept(partitionedTopicMetadata -> {
if (partitionedTopicMetadata == null ||
partitionedTopicMetadata.partitions == 0) {
final String topicErrorType = partitionedTopicMetadata
== null ? "has no metadata" : "has zero
partitions";
throw new RestException(Status.NOT_FOUND,
String.format(
"Partitioned Topic not found: %s %s",
topicName.toString(), topicErrorType));
- } else if
(!internalGetList(Optional.empty()).contains(topicName.toString())) {
+ }
+ })
+ .thenCompose(__ -> internalGetListAsync(Optional.empty()))
+ .thenApply(topics -> {
+ if (!topics.contains(topicName.toString())) {
throw new RestException(Status.NOT_FOUND, "Topic
partitions were not yet created");
}
throw new RestException(Status.NOT_FOUND,
-
getPartitionedTopicNotFoundErrorMessage(topicName.toString()));
- });
+
getPartitionedTopicNotFoundErrorMessage(topicName.toString()));
+ });
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 028c17da3d3..a114bf54cca 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -71,14 +71,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("cluster") String cluster, @PathParam("namespace")
String namespace,
@ApiParam(value = "Specify the bundle name", required = false)
@QueryParam("bundle") String bundle) {
- try {
- validateNamespaceName(property, cluster, namespace);
- asyncResponse.resume(internalGetList(Optional.ofNullable(bundle)));
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(property, cluster, namespace);
+ internalGetListAsync(Optional.ofNullable(bundle))
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get topic list {}",
clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 1fa09d747a1..bac4145871b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -100,14 +100,16 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("bundle") String bundle,
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
- try {
- validateNamespaceName(tenant, namespace);
-
asyncResponse.resume(filterSystemTopic(internalGetList(Optional.ofNullable(bundle)),
includeSystemTopic));
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(tenant, namespace);
+ internalGetListAsync(Optional.ofNullable(bundle))
+ .thenAccept(topicList ->
asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic)))
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get topic list {}",
clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 425acf6c664..2a4e204c044 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -792,7 +792,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.getList(response, property, cluster, namespace, null);
- verify(response, times(1)).resume(Lists.newArrayList());
+ verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
// create topic
response = mock(AsyncResponse.class);
persistentTopics.getPartitionedTopicList(response, property, cluster,
namespace);