AnonHxy commented on a change in pull request #13901:
URL: https://github.com/apache/pulsar/pull/13901#discussion_r790138568
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1651,118 +1651,123 @@ private void
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes
}
protected void internalSkipAllMessages(AsyncResponse asyncResponse, String
subName, boolean authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to skip all messages for subscription
{} on topic {}",
- clientAppId(), subName, topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
}
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- internalSkipAllMessagesForNonPartitionedTopic(asyncResponse,
subName, authoritative);
- } else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ future.thenRun(() -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenRun(() -> validateTopicOperationAsync(topicName,
TopicOperation.SKIP, subName))
+ .thenRun(() -> {
+ // If the topic name is a partition name, no need to get
partition topic metadata again
+ if (topicName.isPartitioned()) {
+
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName,
authoritative);
+ } else {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
- futures.add(pulsar()
- .getAdminClient()
- .topics()
-
.skipAllMessagesAsync(topicNamePartition.toString(),
+ for (int i = 0; i < partitionMetadata.partitions;
i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+ futures.add(pulsar()
+ .getAdminClient()
+ .topics()
+
.skipAllMessagesAsync(topicNamePartition.toString(),
subName));
- } catch (Exception e) {
- log.error("[{}] Failed to skip all messages {} {}",
- clientAppId(), topicNamePartition,
subName, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
+ } catch (Exception e) {
+ log.error("[{}] Failed to skip all
messages {} {}",
+ clientAppId(), topicNamePartition,
subName, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
- FutureUtil.waitForAll(futures).handle((result, exception)
-> {
- if (exception != null) {
- Throwable t = exception.getCause();
- if (t instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Subscription not found"));
- return null;
- } else {
- log.error("[{}] Failed to skip all messages {}
{}",
- clientAppId(), topicName, subName, t);
- asyncResponse.resume(new RestException(t));
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse.resume(
+ new
RestException(Status.NOT_FOUND, "Subscription not found"));
+ return null;
+ } else {
+ log.error("[{}] Failed to skip all
messages {} {}",
+ clientAppId(), topicName, subName,
t);
+ asyncResponse.resume(new
RestException(t));
+ return null;
+ }
+ }
+
+
asyncResponse.resume(Response.noContent().build());
return null;
- }
+ });
+ } else {
+
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName,
authoritative);
}
-
- asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to skip all messages for
subscription {} on topic {}",
+ clientAppId(), subName, topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
- } else {
-
internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName,
authoritative);
}
}).exceptionally(ex -> {
log.error("[{}] Failed to skip all messages for subscription
{} on topic {}",
Review comment:
OK, I see there is a better method `FutureUtil.unwrapCompletionException`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]