Jason918 commented on a change in pull request #13880:
URL: https://github.com/apache/pulsar/pull/13880#discussion_r792354936
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1802,123 +1802,140 @@ protected void internalSkipMessages(String subName,
int numMessages, boolean aut
protected void internalExpireMessagesForAllSubscriptions(AsyncResponse
asyncResponse, int expireTimeInSeconds,
boolean authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to expire messages for all subscription
on topic {}",
- clientAppId(), topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
-
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
- expireTimeInSeconds, authoritative);
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ future = CompletableFuture.completedFuture(null);
+ }
+ future.thenAccept(__ ->
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .thenAccept(partitionMetadata -> {
+ if (topicName.isPartitioned()) {
+
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+ partitionMetadata, expireTimeInSeconds,
authoritative);
+ } else {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
+
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
- // expire messages for each partition topic
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
- futures.add(pulsar()
- .getAdminClient()
- .topics()
- .expireMessagesForAllSubscriptionsAsync(
- topicNamePartition.toString(),
expireTimeInSeconds));
- } catch (Exception e) {
- log.error("[{}] Failed to expire messages up to {}
on {}",
- clientAppId(), expireTimeInSeconds,
- topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
+ // expire messages for each partition topic
+ for (int i = 0; i < partitionMetadata.partitions;
i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+ futures.add(pulsar()
+ .getAdminClient()
+ .topics()
+
.expireMessagesForAllSubscriptionsAsync(
+
topicNamePartition.toString(), expireTimeInSeconds));
+ } catch (Exception e) {
+ log.error("[{}] Failed to expire messages
up to {} on {}",
+ clientAppId(), expireTimeInSeconds,
+ topicNamePartition, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
- FutureUtil.waitForAll(futures).handle((result, exception)
-> {
- if (exception != null) {
- Throwable t = exception.getCause();
- log.error("[{}] Failed to expire messages up to {}
on {}",
- clientAppId(), expireTimeInSeconds,
- topicName, t);
- asyncResponse.resume(new RestException(t));
- return null;
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ log.error("[{}] Failed to expire messages
up to {} on {}",
+ clientAppId(), expireTimeInSeconds,
+ topicName, t);
+ asyncResponse.resume(new RestException(t));
+ return null;
+ }
+
asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ } else {
+
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+ partitionMetadata, expireTimeInSeconds,
authoritative);
}
-
- asyncResponse.resume(Response.noContent().build());
- return null;
- });
- } else {
-
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
- expireTimeInSeconds, authoritative);
+ }
}
- }).exceptionally(ex -> {
+ )
+ ).exceptionally(ex -> {
Review comment:
Let's use `Throwable cause = FutureUtil.unwrapCompletionException(ex);`
to keep error handling in this class consist.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]