liudezhi2098 commented on a change in pull request #13880:
URL: https://github.com/apache/pulsar/pull/13880#discussion_r793223068
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1802,123 +1802,135 @@ protected void internalSkipMessages(String subName,
int numMessages, boolean aut
protected void internalExpireMessagesForAllSubscriptions(AsyncResponse
asyncResponse, int expireTimeInSeconds,
boolean authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to expire messages for all subscription
on topic {}",
- clientAppId(), topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
-
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
- expireTimeInSeconds, authoritative);
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ future = CompletableFuture.completedFuture(null);
+ }
+ future.thenCompose(__ ->
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .thenAccept(partitionMetadata -> {
+ if (topicName.isPartitioned()) {
+
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+ partitionMetadata, expireTimeInSeconds,
authoritative);
+ } else {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
+
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
- // expire messages for each partition topic
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
+ // expire messages for each partition topic
+ for (int i = 0; i < partitionMetadata.partitions;
i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+ futures.add(pulsar()
+ .getAdminClient()
+ .topics()
+
.expireMessagesForAllSubscriptionsAsync(
+
topicNamePartition.toString(), expireTimeInSeconds));
+ } catch (Exception e) {
+ log.error("[{}] Failed to expire messages
up to {} on {}",
+ clientAppId(), expireTimeInSeconds,
+ topicNamePartition, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
+
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ log.error("[{}] Failed to expire messages
up to {} on {}",
+ clientAppId(), expireTimeInSeconds,
+ topicName, t);
+ asyncResponse.resume(new RestException(t));
+ return null;
+ }
+
asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ } else {
+
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+ partitionMetadata, expireTimeInSeconds,
authoritative);
+ }
+ }
+ }
+ )
+ ).exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ log.error("[{}] Failed to expire messages for all subscription on
topic {}", clientAppId(), topicName,
+ cause);
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ });
+
+ }
+
+ private void
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse
asyncResponse,
+
PartitionedTopicMetadata
+
partitionMetadata,
+
int expireTimeInSeconds,
+
boolean authoritative) {
+ try {
+ // validate ownership and redirect if current broker is not owner
+ validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES)
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ ->
getTopicReferenceAsync(topicName).thenAccept(t -> {
+ if (t == null) {
+ resumeAsyncResponseExceptionally(asyncResponse, new
RestException(Status.NOT_FOUND,
+ "Topic not found"));
+ return;
+ }
+ if (!(t instanceof PersistentTopic)) {
+ resumeAsyncResponseExceptionally(asyncResponse, new
RestException(Status.METHOD_NOT_ALLOWED,
+ "Expire messages for all subscriptions on a
non-persistent topic is not allowed"));
+ return;
+ }
+ PersistentTopic topic = (PersistentTopic) t;
+ final List<CompletableFuture<Void>> futures =
+ Lists.newArrayListWithCapacity((int)
topic.getReplicators().size());
+ List<String> subNames =
+ Lists.newArrayListWithCapacity((int)
topic.getReplicators().size()
+ + (int) topic.getSubscriptions().size());
+ subNames.addAll(topic.getReplicators().keys());
+ subNames.addAll(topic.getSubscriptions().keys());
+ for (int i = 0; i < subNames.size(); i++) {
try {
- futures.add(pulsar()
- .getAdminClient()
- .topics()
- .expireMessagesForAllSubscriptionsAsync(
- topicNamePartition.toString(),
expireTimeInSeconds));
+
futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
+ subNames.get(i), expireTimeInSeconds,
authoritative));
} catch (Exception e) {
- log.error("[{}] Failed to expire messages up to {}
on {}",
- clientAppId(), expireTimeInSeconds,
- topicNamePartition, e);
+ log.error("[{}] Failed to expire messages for all
subscription up to {} on {}",
+ clientAppId(), expireTimeInSeconds,
topicName, e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception)
-> {
if (exception != null) {
- Throwable t = exception.getCause();
- log.error("[{}] Failed to expire messages up to {}
on {}",
- clientAppId(), expireTimeInSeconds,
- topicName, t);
- asyncResponse.resume(new RestException(t));
+ Throwable throwable = exception.getCause();
+ log.error("[{}] Failed to expire messages for all
subscription up to {} on {}",
+ clientAppId(), expireTimeInSeconds,
topicName, throwable);
+ asyncResponse.resume(new RestException(throwable));
return null;
}
-
asyncResponse.resume(Response.noContent().build());
return null;
});
- } else {
-
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
- expireTimeInSeconds, authoritative);
- }
- }).exceptionally(ex -> {
- log.error("[{}] Failed to expire messages for all subscription
on topic {}",
- clientAppId(), topicName, ex);
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
- }
- }
- private void
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse
asyncResponse,
-
int expireTimeInSeconds,
-
boolean authoritative) {
- // validate ownership and redirect if current broker is not owner
- PersistentTopic topic;
- try {
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
- topic = (PersistentTopic) getTopicReference(topicName);
- } catch (WebApplicationException wae) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Failed to expire messages for all subscription
on topic {},"
- + " redirecting to other brokers.",
- clientAppId(), topicName, wae);
- }
- resumeAsyncResponseExceptionally(asyncResponse, wae);
- return;
- } catch (Exception e) {
- log.error("[{}] Failed to expire messages for all subscription on
topic {}",
- clientAppId(), topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- final AtomicReference<Throwable> exception = new AtomicReference<>();
-
- topic.getReplicators().forEach((subName, replicator) -> {
- try {
- internalExpireMessagesByTimestampForSinglePartition(subName,
expireTimeInSeconds, authoritative);
- } catch (Throwable t) {
- exception.set(t);
- }
- });
-
- topic.getSubscriptions().forEach((subName, subscriber) -> {
- try {
- internalExpireMessagesByTimestampForSinglePartition(subName,
expireTimeInSeconds, authoritative);
- } catch (Throwable t) {
- exception.set(t);
- }
- });
-
- if (exception.get() != null) {
- if (exception.get() instanceof WebApplicationException) {
- WebApplicationException wae = (WebApplicationException)
exception.get();
- asyncResponse.resume(wae);
- return;
- } else {
- asyncResponse.resume(new RestException(exception.get()));
- return;
+ })
+ ).exceptionally(e -> {
+ Throwable throwable = e.getCause();
+ log.error("[{}] Failed to expire messages for all
subscription up to {} on {}", clientAppId(),
+ expireTimeInSeconds, topicName, throwable);
+ asyncResponse.resume(new RestException(throwable));
+ return null;
+ });
+ } catch (Exception e) {
Review comment:
it will throw new RestException(Status.UNAUTHORIZED, "Need to
authenticate to perform the request").
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]