AnonHxy commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r914650812


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,94 @@ protected void internalResetCursor(AsyncResponse 
asyncResponse, String subName,
                         TopicName topicNamePartition = 
topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    
.resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) 
-> {
-                                if (ex != null) {
-                                    if (ex instanceof 
PreconditionFailedException) {
-                                        // throw the last exception if all 
partitions get this error
-                                        // any other exception on partition is 
reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset 
cursor on subscription {} to time {}",
+                                
.resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof 
PreconditionFailedException) {
+                                            // throw the last exception if all 
partitions get this error
+                                            // any other exception on 
partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to 
reset cursor on subscription {} to time {}",
                                                 clientAppId(), 
topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on 
subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
+                    return future.whenComplete((r, ex) -> {
                         if (ex != null) {
                             if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new 
RestException((PulsarAdminException) ex));
-                                return;
+                                throw new RestException((PulsarAdminException) 
ex);
                             } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
+                                throw new RestException(ex);
                             }
                         }

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to