AnonHxy commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r916513586
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,86 @@ protected void internalResetCursor(AsyncResponse
asyncResponse, String subName,
TopicName topicNamePartition =
topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
-
.resetCursorAsync(topicNamePartition.toString(),
- subName, timestamp).handle((r, ex)
-> {
- if (ex != null) {
- if (ex instanceof
PreconditionFailedException) {
- // throw the last exception if all
partitions get this error
- // any other exception on partition is
reported back to user
- failureCount.incrementAndGet();
- partitionException.set(ex);
- } else {
- log.warn("[{}] [{}] Failed to reset
cursor on subscription {} to time {}",
+
.resetCursorAsync(topicNamePartition.toString(),
+ subName, timestamp).handle((r, ex) -> {
+ if (ex != null) {
+ if (ex instanceof
PreconditionFailedException) {
+ // throw the last exception if all
partitions get this error
+ // any other exception on
partition is reported back to user
+ failureCount.incrementAndGet();
+ partitionException.set(ex);
+ } else {
+ log.warn("[{}] [{}] Failed to
reset cursor on subscription {} to time {}",
clientAppId(),
topicNamePartition, subName, timestamp, ex);
- future.completeExceptionally(ex);
- return null;
+ future.completeExceptionally(ex);
+ return null;
+ }
}
- }
- if (count.decrementAndGet() == 0) {
- future.complete(null);
- }
+ if (count.decrementAndGet() == 0) {
+ future.complete(null);
+ }
- return null;
- });
+ return null;
+ });
} catch (Exception e) {
log.warn("[{}] [{}] Failed to reset cursor on
subscription {} to time {}", clientAppId(),
- topicNamePartition, subName, timestamp, e);
+ topicNamePartition, subName, timestamp, e);
future.completeExceptionally(e);
}
}
- future.whenComplete((r, ex) -> {
- if (ex != null) {
- if (ex instanceof PulsarAdminException) {
- asyncResponse.resume(new
RestException((PulsarAdminException) ex));
- return;
- } else {
- asyncResponse.resume(new RestException(ex));
- return;
- }
- }
-
+ return future.whenComplete((r, ex) -> {
// report an error to user if unable to reset for all
partitions
if (failureCount.get() == numPartitions) {
log.warn("[{}] [{}] Failed to reset cursor on
subscription {} to time {}",
- clientAppId(), topicName,
- subName, timestamp,
partitionException.get());
- asyncResponse.resume(
- new
RestException(Status.PRECONDITION_FAILED,
-
partitionException.get().getMessage()));
- return;
+ clientAppId(), topicName,
+ subName, timestamp, partitionException.get());
+ throw new
RestException(Status.PRECONDITION_FAILED,
partitionException.get().getMessage());
} else if (failureCount.get() > 0) {
log.warn("[{}] [{}] Partial errors for reset
cursor on subscription {} to time {}",
- clientAppId(), topicName, subName,
timestamp, partitionException.get());
+ clientAppId(), topicName, subName, timestamp,
partitionException.get());
}
-
- asyncResponse.resume(Response.noContent().build());
});
} else {
- internalResetCursorForNonPartitionedTopic(asyncResponse,
subName, timestamp, authoritative);
+ return internalResetCursorForNonPartitionedTopic(subName,
timestamp, authoritative);
}
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to log
it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to expire messages for all
subscription on topic {}",
- clientAppId(), topicName, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
});
- }
}
- private void internalResetCursorForNonPartitionedTopic(AsyncResponse
asyncResponse, String subName, long timestamp,
+ private CompletableFuture<Void>
internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
boolean authoritative) {
- try {
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.RESET_CURSOR,
subName);
-
- log.info("[{}] [{}] Received reset cursor on subscription {} to
time {}",
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName))
+ .thenCompose(__ -> {
+ log.info("[{}] [{}] Received reset cursor on subscription {}
to time {}",
clientAppId(), topicName, subName, timestamp);
-
- PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
- if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
- getTopicNotFoundErrorMessage(topicName.toString())));
- return;
- }
- PersistentSubscription sub = topic.getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
- getSubNotFoundErrorMessage(topicName.toString(),
subName)));
- return;
- }
- sub.resetCursor(timestamp).thenRun(() -> {
- log.info("[{}][{}] Reset cursor on subscription {} to time
{}", clientAppId(), topicName, subName,
- timestamp);
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
+ return getTopicReferenceAsync(topicName);
+ })
+ .thenCompose(topic -> {
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ throw new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(),
subName));
+ }
+ return sub.resetCursor(timestamp);
+ })
+ .thenRun(() -> log.info("[{}][{}] Reset cursor on subscription {}
to time {}",
+ clientAppId(), topicName, subName, timestamp))
+ .exceptionally(ex -> {
Throwable t = (ex instanceof CompletionException ?
ex.getCause() : ex);
log.warn("[{}][{}] Failed to reset cursor on subscription {}
to time {}", clientAppId(), topicName,
- subName, timestamp, t);
+ subName, timestamp, t);
if (t instanceof SubscriptionInvalidCursorPosition) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Unable to find position for timestamp specified:
" + t.getMessage()));
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Unable to find position for timestamp specified: " +
t.getMessage());
} else if (t instanceof SubscriptionBusyException) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Failed for Subscription Busy: " +
t.getMessage()));
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Failed for Subscription Busy: " + t.getMessage());
} else {
- resumeAsyncResponseExceptionally(asyncResponse, t);
+ throw new RestException(t);
}
Review Comment:
Make sense. Moving RestException to REST layer looks a little better.
@Technoboy- @mattisonchao
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]