Technoboy- commented on a change in pull request #13880:
URL: https://github.com/apache/pulsar/pull/13880#discussion_r792460904
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1802,123 +1802,141 @@ protected void internalSkipMessages(String subName,
int numMessages, boolean aut
protected void internalExpireMessagesForAllSubscriptions(AsyncResponse
asyncResponse, int expireTimeInSeconds,
boolean authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to expire messages for all subscription
on topic {}",
- clientAppId(), topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
-
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
- expireTimeInSeconds, authoritative);
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ future = CompletableFuture.completedFuture(null);
+ }
+ future.thenAccept(__ ->
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .thenAccept(partitionMetadata -> {
+ if (topicName.isPartitioned()) {
+
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+ partitionMetadata, expireTimeInSeconds,
authoritative);
+ } else {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
+
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
- // expire messages for each partition topic
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
+ // expire messages for each partition topic
+ for (int i = 0; i < partitionMetadata.partitions;
i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+ futures.add(pulsar()
+ .getAdminClient()
+ .topics()
+
.expireMessagesForAllSubscriptionsAsync(
+
topicNamePartition.toString(), expireTimeInSeconds));
+ } catch (Exception e) {
+ log.error("[{}] Failed to expire messages
up to {} on {}",
+ clientAppId(), expireTimeInSeconds,
+ topicNamePartition, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
+
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ log.error("[{}] Failed to expire messages
up to {} on {}",
+ clientAppId(), expireTimeInSeconds,
+ topicName, t);
+ asyncResponse.resume(new RestException(t));
+ return null;
+ }
+
asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ } else {
+
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+ partitionMetadata, expireTimeInSeconds,
authoritative);
+ }
+ }
+ }
+ )
+ ).exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ log.error("[{}] Failed to expire messages for all subscription on
topic {}", clientAppId(), topicName,
+ cause);
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ });
+
+ }
+
+ private void
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse
asyncResponse,
+
PartitionedTopicMetadata
+
partitionMetadata,
+
int expireTimeInSeconds,
+
boolean authoritative) {
+ try {
+ // validate ownership and redirect if current broker is not owner
+ validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES)
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenAccept(__ ->
getTopicReferenceAsync(topicName).thenAccept(t -> {
Review comment:
I think we should use thenCompose here:
.thenCompose(__ -> getTopicReferenceAsync(topicName)).xxx
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]