This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 61f99cb [Broker] Fix call sync method in async rest api for
internalExpireMessagesByPosition (#13878)
61f99cb is described below
commit 61f99cb9676c8eb77d9dc2c3bb8a6aa2115896c0
Author: Dezhi LIiu <[email protected]>
AuthorDate: Thu Jan 27 15:39:01 2022 +0800
[Broker] Fix call sync method in async rest api for
internalExpireMessagesByPosition (#13878)
### Motivation
Avoid call sync method in async rest API for
PersistentTopicsBase#internalExpireMessagesByPosition.
### Modifications
Use async instead of sync method.
---
.../broker/admin/impl/PersistentTopicsBase.java | 98 +++++++++++++---------
1 file changed, 58 insertions(+), 40 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a137fbd..5acb796 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3505,39 +3505,52 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalExpireMessagesByPosition(AsyncResponse
asyncResponse, String subName, boolean authoritative,
MessageIdImpl messageId,
boolean isExcluded, int batchIndex) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.warn("[{}][{}] Failed to expire messages on subscription
{} to position {}: {}", clientAppId(),
- topicName, subName, messageId, e.getMessage());
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
}
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
-
- log.info("[{}][{}] received expire messages on subscription {} to
position {}", clientAppId(), topicName,
- subName, messageId);
+ future.thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ -> {
+ log.info("[{}][{}] received expire messages on
subscription {} to position {}", clientAppId(),
+ topicName, subName, messageId);
+ return getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
+ .thenAccept(partitionMetadata -> {
+ if (!topicName.isPartitioned() &&
partitionMetadata.partitions > 0) {
+ String msg = "Expire message at position
is not supported for partitioned-topic";
+ log.warn("[{}] {} {}({}) {}",
clientAppId(), msg, topicName, messageId, subName);
+ asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED, msg));
+ return;
+ } else if (messageId.getPartitionIndex() !=
topicName.getPartitionIndex()) {
+ String msg = "Invalid parameter for expire
message by position, partition index of "
+ + "passed in message position
doesn't match partition index for the topic";
+ log.warn("[{}] {} {}({}).", clientAppId(),
msg, topicName, messageId);
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED, msg));
+ return;
+ } else {
+
internalExpireMessagesNonPartitionedTopicByPosition(asyncResponse, subName,
+ messageId, isExcluded, batchIndex);
+ }
+ });
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to expire messages up to {} on subscription
{} to position {}",
+ clientAppId(), topicName, subName, messageId, cause);
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ });
+ }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (!topicName.isPartitioned() &&
getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
- log.warn("[{}] Not supported operation expire message up to {} on
partitioned-topic {} {}",
- clientAppId(), messageId, topicName, subName);
- asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
- "Expire message at position is not supported for
partitioned-topic"));
- return;
- } else if (messageId.getPartitionIndex() !=
topicName.getPartitionIndex()) {
- log.warn("[{}] Invalid parameter for expire message by position,
partition index of passed in message"
- + " position {} doesn't match partition index of
topic requested {}.",
- clientAppId(), messageId, topicName);
- asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
- "Invalid parameter for expire message by position,
partition index of message position "
- + "passed in doesn't match partition index for the
topic."));
- } else {
- PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
+ private CompletableFuture<Void>
internalExpireMessagesNonPartitionedTopicByPosition(AsyncResponse asyncResponse,
+
String subName,
+
MessageIdImpl messageId,
+
boolean isExcluded,
+
int batchIndex) {
+ return getTopicReferenceAsync(topicName).thenAccept(t -> {
+ PersistentTopic topic = (PersistentTopic) t;
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Topic not found"));
return;
@@ -3545,7 +3558,8 @@ public class PersistentTopicsBase extends AdminResource {
try {
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Subscription not found"));
return;
}
CompletableFuture<Integer> batchSizeFuture = new
CompletableFuture<>();
@@ -3570,23 +3584,22 @@ public class PersistentTopicsBase extends AdminResource
{
} else {
if (log.isDebugEnabled()) {
log.debug("Expire message by position not
issued on topic {} for subscription {} "
- + "due to ongoing message expiration
not finished or subscription "
- + "almost catch up.", topicName,
subName);
+ + "due to ongoing message expiration
not finished or subscription almost "
+ + "catch up.", topicName, subName);
}
throw new RestException(Status.CONFLICT, "Expire
message by position not issued on topic "
- + topicName + " for subscription " +
subName + " due to ongoing message expiration"
- + " not finished or invalid message
position provided.");
+ + topicName + " for subscription " +
subName + " due to ongoing"
+ + " message expiration not finished or
invalid message position provided.");
}
- } catch (NullPointerException npe) {
- throw new RestException(Status.NOT_FOUND,
"Subscription not found");
} catch (Exception exception) {
log.error("[{}] Failed to expire messages up to {} on
{} with subscription {} {}",
clientAppId(), position, topicName, subName,
exception);
throw new RestException(exception);
}
+ asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
- log.error("[{}] Failed to expire messages up to {} on {}
with subscription {} {}", clientAppId(),
- messageId, topicName, subName, e);
+ log.error("[{}] Failed to expire messages up to {} on {}
with subscription {} {}",
+ clientAppId(), messageId, topicName, subName, e);
asyncResponse.resume(e);
return null;
});
@@ -3595,8 +3608,13 @@ public class PersistentTopicsBase extends AdminResource {
clientAppId(), topicName, messageId, subName,
messageId, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
- }
- asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to expire messages up to {} on subscription
{} to position {}", clientAppId(),
+ topicName, subName, messageId, cause);
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ });
}
protected void internalTriggerCompaction(AsyncResponse asyncResponse,
boolean authoritative) {