Technoboy- commented on a change in pull request #13841:
URL: https://github.com/apache/pulsar/pull/13841#discussion_r794407092



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1957,154 +1958,135 @@ private void 
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
     }
 
     protected void internalResetCursor(AsyncResponse asyncResponse, String 
subName, long timestamp,
-            boolean authoritative) {
+                                       boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.warn("[{}][{}] Failed to reset cursor on subscription {} 
to time {}: {}",
-                        clientAppId(), topicName,
-                        subName, timestamp, e.getMessage());
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
         }
 
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, 
subName);
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.RESET_CURSOR, subName))
+                .thenCompose(__ -> {
+                    log.info("[{}] [{}] Received reset cursor on subscription 
{} to time {}",
+                            clientAppId(), topicName, subName, timestamp);
+                    // If the topic name is a partition name, no need to get 
partition topic metadata again
+                    if (topicName.isPartitioned()) {
+                        return 
internalResetCursorForNonPartitionedTopicAsync(subName, timestamp);
+                    } else {
+                        return 
internalResetCursorForPartitionedTopicAsync(subName, timestamp, authoritative);
+                    }
+                }).whenComplete((__, ex) -> {
+                    if (ex == null) {
+                        log.info("[{}][{}] Reset cursor on subscription {} to 
time {}", clientAppId(), topicName,
+                                subName, timestamp);
+                        asyncResponse.resume(Response.noContent().build());
+                        return;
+                    }
+
+                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                    log.error("[{}] [{}] Failed to reset cursor on 
subscription {} to time {}", clientAppId(),
+                            topicName, subName, timestamp, cause);
+
+                    if (cause instanceof SubscriptionInvalidCursorPosition) {
+                        asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                                "Unable to find position for timestamp 
specified: " + cause.getMessage()));
+                    } else if (cause instanceof SubscriptionBusyException) {
+                        asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                                "Failed for Subscription Busy: " + 
cause.getMessage()));
+                    } else if (cause instanceof NotAllowedException) {
+                        asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED, cause.getMessage()));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, cause);
+                    }
+                });
+    }
+
+    private CompletableFuture<Void> 
internalResetCursorForPartitionedTopicAsync(String subName, long timestamp,
+                                                                               
 boolean authoritative) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
+                .thenCompose(partitionMetadata -> {
+                    final int numPartitions = partitionMetadata.partitions;
+                    if (numPartitions <= 0) {
+                        return 
internalResetCursorForNonPartitionedTopicAsync(subName, timestamp);
+                    }
 
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            internalResetCursorForNonPartitionedTopic(asyncResponse, subName, 
timestamp, authoritative);
-        } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
-                final int numPartitions = partitionMetadata.partitions;
-                if (numPartitions > 0) {
-                    final CompletableFuture<Void> future = new 
CompletableFuture<>();
-                    final AtomicInteger count = new 
AtomicInteger(numPartitions);
                     final AtomicInteger failureCount = new AtomicInteger(0);
                     final AtomicReference<Throwable> partitionException = new 
AtomicReference<>();
-
-                    for (int i = 0; i < numPartitions; i++) {
+                    final List<CompletableFuture<?>> futures = 
IntStream.range(0, numPartitions).mapToObj(i -> {
                         TopicName topicNamePartition = 
topicName.getPartition(i);
                         try {
+                            CompletableFuture<Void> future = new 
CompletableFuture<>();
                             pulsar().getAdminClient().topics()
-                                    
.resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) 
-> {
-                                if (ex != null) {
-                                    if (ex instanceof 
PreconditionFailedException) {
-                                        // throw the last exception if all 
partitions get this error
-                                        // any other exception on partition is 
reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset 
cursor on subscription {} to time {}",
-                                                clientAppId(), 
topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
-                                    }
-                                }
-
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
-
-                                return null;
-                            });
-                        } catch (Exception e) {
-                            log.warn("[{}] [{}] Failed to reset cursor on 
subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
-                            future.completeExceptionally(e);
-                        }
-                    }
+                                    
.resetCursorAsync(topicNamePartition.toString(), subName, timestamp)
+                                    .handle((___, ex) -> {
+                                        if (ex != null) {
+                                            if (ex instanceof 
PreconditionFailedException) {
+                                                // throw the last exception if 
all partitions get
+                                                // this error
+                                                // any other exception on 
partition is reported
+                                                // back to user
+                                                failureCount.incrementAndGet();
+                                                partitionException.set(ex);
+                                            } else {
+                                                log.warn("[{}] [{}] Failed to 
reset cursor on subscription {} to time "
+                                                                + "{}", 
clientAppId(), topicNamePartition, subName,
+                                                        timestamp, ex);
+                                                
future.completeExceptionally(ex);
+                                                return null;
+                                            }
+                                        }
 
-                    future.whenComplete((r, ex) -> {
-                        if (ex != null) {
-                            if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new 
RestException((PulsarAdminException) ex));
-                                return;
-                            } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
-                            }
+                                        future.complete(null);
+                                        return null;
+                                    });
+                            return future;
+                        } catch (PulsarServerException ex) {
+                            log.error("[{}] Failed to get admin client while 
delete partition {}", clientAppId(),
+                                    topicNamePartition, ex);
+                            return FutureUtil.failedFuture(ex);
                         }
+                    }).collect(Collectors.toList());
 
+                    return FutureUtil.waitForAll(futures).thenCompose((__) -> {
                         // report an error to user if unable to reset for all 
partitions
                         if (failureCount.get() == numPartitions) {
                             log.warn("[{}] [{}] Failed to reset cursor on 
subscription {} to time {}",
-                                    clientAppId(), topicName,
-                                    subName, timestamp, 
partitionException.get());
-                            asyncResponse.resume(
-                                    new 
RestException(Status.PRECONDITION_FAILED,
-                                            
partitionException.get().getMessage()));
-                            return;
+                                    clientAppId(), topicName, subName, 
timestamp, partitionException.get());
+                            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                                    partitionException.get().getMessage()));
                         } else if (failureCount.get() > 0) {
                             log.warn("[{}] [{}] Partial errors for reset 
cursor on subscription {} to time {}",
                                     clientAppId(), topicName, subName, 
timestamp, partitionException.get());
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
+                        return CompletableFuture.completedFuture(null);
                     });
-                } else {
-                    internalResetCursorForNonPartitionedTopic(asyncResponse, 
subName, timestamp, authoritative);
-                }
-            }).exceptionally(ex -> {
-                log.error("[{}] Failed to expire messages for all subscription 
on topic {}",
-                        clientAppId(), topicName, ex);
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
-            });
-        }
+                });
     }
 
-    private void internalResetCursorForNonPartitionedTopic(AsyncResponse 
asyncResponse, String subName, long timestamp,
-                                       boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, 
subName);

Review comment:
       Why remove these two lines(2064~2065)? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to