This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ce24db11ada [improve][broker][PIP-149]make internalPeekNthMessage
method async (#16192)
ce24db11ada is described below
commit ce24db11adaf60d9cfb302ce39e76cdc9e43bd2d
Author: Qiang Huang <[email protected]>
AuthorDate: Sat Jun 25 19:41:40 2022 +0800
[improve][broker][PIP-149]make internalPeekNthMessage method async (#16192)
* [improve][broker][PIP-149]make internalPeekNthMessage method async
* fix UT
---
.../broker/admin/impl/PersistentTopicsBase.java | 91 ++++++++++++----------
.../pulsar/broker/admin/v1/PersistentTopics.java | 15 +++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 14 +++-
3 files changed, 75 insertions(+), 45 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8b84cd74d74..5ec93241921 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2805,50 +2805,59 @@ public class PersistentTopicsBase extends AdminResource
{
}
}
- protected Response internalPeekNthMessage(String subName, int
messagePosition, boolean authoritative) {
+ protected CompletableFuture<Response> internalPeekNthMessageAsync(String
subName, int messagePosition,
+ boolean
authoritative) {
+ CompletableFuture<Void> ret;
// If the topic name is a partition name, no need to get partition
topic metadata again
- if (!topicName.isPartitioned() &&
getPartitionedTopicMetadata(topicName,
- authoritative, false).partitions > 0) {
- throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages
on a partitioned topic is not allowed");
- }
-
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
-
- if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
- log.error("[{}] Not supported operation of non-persistent topic {}
{}", clientAppId(), topicName,
- subName);
- throw new RestException(Status.METHOD_NOT_ALLOWED,
- "Peek messages on a non-persistent topic is not allowed");
- }
-
- PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
- PersistentReplicator repl = null;
- PersistentSubscription sub = null;
- Entry entry = null;
- if (subName.startsWith(topic.getReplicatorPrefix())) {
- repl = getReplicatorReference(subName, topic);
+ if (!topicName.isPartitioned()) {
+ ret = getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenCompose(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Peek messages on a partitioned topic is
not allowed");
+ }
+ return CompletableFuture.completedFuture(null);
+ });
} else {
- sub = (PersistentSubscription) getSubscriptionReference(subName,
topic);
- }
- try {
- if (subName.startsWith(topic.getReplicatorPrefix())) {
- entry = repl.peekNthMessage(messagePosition).get();
- } else {
- entry = sub.peekNthMessage(messagePosition).get();
- }
- return generateResponseWithEntry(entry);
- } catch (NullPointerException npe) {
- throw new RestException(Status.NOT_FOUND, "Message not found");
- } catch (Exception exception) {
- log.error("[{}] Failed to peek message at position {} from {} {}",
clientAppId(), messagePosition,
- topicName, subName, exception);
- throw new RestException(exception);
- } finally {
- if (entry != null) {
- entry.release();
- }
+ ret = CompletableFuture.completedFuture(null);
}
+ return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.PEEK_MESSAGES))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ CompletableFuture<Entry> entry;
+ if (!(topic instanceof PersistentTopic)) {
+ log.error("[{}] Not supported operation of
non-persistent topic {} {}", clientAppId(),
+ topicName, subName);
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Peek messages on a non-persistent topic is
not allowed");
+ } else {
+ if (subName.startsWith(((PersistentTopic)
topic).getReplicatorPrefix())) {
+ PersistentReplicator repl =
getReplicatorReference(subName, (PersistentTopic) topic);
+ entry = repl.peekNthMessage(messagePosition);
+ } else {
+ PersistentSubscription sub =
+ (PersistentSubscription)
getSubscriptionReference(subName, (PersistentTopic) topic);
+ entry = sub.peekNthMessage(messagePosition);
+ }
+ }
+ return entry;
+ }).thenCompose(entry -> {
+ try {
+ Response response = generateResponseWithEntry(entry);
+ return CompletableFuture.completedFuture(response);
+ } catch (NullPointerException npe) {
+ throw new RestException(Status.NOT_FOUND, "Message not
found");
+ } catch (Exception exception) {
+ log.error("[{}] Failed to peek message at position {}
from {} {}", clientAppId(),
+ messagePosition, topicName, subName,
exception);
+ throw new RestException(exception);
+ } finally {
+ if (entry != null) {
+ entry.release();
+ }
+ }
+ });
}
protected Response internalExamineMessage(String initialPosition, long
messagePosition, boolean authoritative) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index c6c1e189984..e492f799195 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -740,12 +740,23 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic, subscription or the
message position does not exist") })
- public Response peekNthMessage(@PathParam("property") String property,
@PathParam("cluster") String cluster,
+ public void peekNthMessage(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property, @PathParam("cluster")
String cluster,
@PathParam("namespace") String namespace, @PathParam("topic")
@Encoded String encodedTopic,
@PathParam("subName") String encodedSubName,
@PathParam("messagePosition") int messagePosition,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
- return internalPeekNthMessage(decode(encodedSubName), messagePosition,
authoritative);
+ internalPeekNthMessageAsync(decode(encodedSubName), messagePosition,
authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get peek nth message for
topic {} subscription {}", clientAppId(),
+ topicName, decode(encodedSubName), ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index ad063381343..88067344949 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1694,7 +1694,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global
cluster configuration")})
- public Response peekNthMessage(
+ public void peekNthMessage(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -1708,7 +1709,16 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Is authentication required to perform this
operation")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- return internalPeekNthMessage(decode(encodedSubName), messagePosition,
authoritative);
+ internalPeekNthMessageAsync(decode(encodedSubName), messagePosition,
authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get peek nth message for
topic {} subscription {}", clientAppId(),
+ topicName, decode(encodedSubName), ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET