AnonHxy commented on code in PR #16464:
URL: https://github.com/apache/pulsar/pull/16464#discussion_r918478698
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4920,73 +4920,45 @@ protected void handleTopicPolicyException(String
methodName, Throwable thr, Asyn
resumeAsyncResponseExceptionally(asyncResponse, cause);
}
- protected void internalTruncateNonPartitionedTopic(AsyncResponse
asyncResponse, boolean authoritative) {
- Topic topic;
- try {
- validateAdminAccessForTenant(topicName.getTenant());
- validateTopicOwnership(topicName, authoritative);
- topic = getTopicReference(topicName);
- } catch (Exception e) {
- log.error("[{}] Failed to truncate topic {}", clientAppId(),
topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- CompletableFuture<Void> future = topic.truncate();
- future.thenAccept(a -> {
- asyncResponse.resume(new
RestException(Response.Status.NO_CONTENT.getStatusCode(),
- Response.Status.NO_CONTENT.getReasonPhrase()));
- }).exceptionally(e -> {
- asyncResponse.resume(e);
- return null;
- });
+ protected CompletableFuture<Void>
internalTruncateNonPartitionedTopicAsync(boolean authoritative) {
+ return validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(Topic::truncate);
}
- protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean
authoritative) {
+ protected CompletableFuture<Void> internalTruncateTopicAsync(boolean
authoritative) {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
- internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
+ return internalTruncateNonPartitionedTopicAsync(authoritative);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative,
false).whenComplete((meta, t) -> {
+ return getPartitionedTopicMetadataAsync(topicName, authoritative,
false).thenCompose(meta -> {
if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ final List<CompletableFuture<Void>> futures = new
ArrayList<>(meta.partitions);
for (int i = 0; i < meta.partitions; i++) {
TopicName topicNamePartition =
topicName.getPartition(i);
try {
- futures.add(pulsar().getAdminClient().topics()
+ futures.add(
+ pulsar().getAdminClient().topics()
.truncateAsync(topicNamePartition.toString()));
} catch (Exception e) {
log.error("[{}] Failed to truncate topic {}",
clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
+ return FutureUtil.failedFuture(new
RestException(e));
}
}
- FutureUtil.waitForAll(futures).handle((result, exception)
-> {
+ return FutureUtil.waitForAll(futures).handle((result,
exception) -> {
if (exception != null) {
Throwable th = exception.getCause();
if (th instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to truncate topic {}",
clientAppId(), topicName, exception);
- asyncResponse.resume(new
RestException(exception));
+ throw new RestException(Status.NOT_FOUND,
th.getMessage());
Review Comment:
Fixed .PTAL @Technoboy-
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]