codelipenghui commented on a change in pull request #13841:
URL: https://github.com/apache/pulsar/pull/13841#discussion_r792323757



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1957,154 +1957,138 @@ private void 
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
     }
 
     protected void internalResetCursor(AsyncResponse asyncResponse, String 
subName, long timestamp,
-            boolean authoritative) {
+                                       boolean authoritative) {
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.warn("[{}][{}] Failed to reset cursor on subscription {} 
to time {}: {}",
-                        clientAppId(), topicName,
-                        subName, timestamp, e.getMessage());
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);

Review comment:
       if (topicName.isGlobal()) {
               future = validateGlobalNamespaceOwnershipAsync(namespaceName);
   } else {
                future = CompletableFuture.completedFuture(null);
   }

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1957,154 +1957,138 @@ private void 
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
     }
 
     protected void internalResetCursor(AsyncResponse asyncResponse, String 
subName, long timestamp,
-            boolean authoritative) {
+                                       boolean authoritative) {
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.warn("[{}][{}] Failed to reset cursor on subscription {} 
to time {}: {}",
-                        clientAppId(), topicName,
-                        subName, timestamp, e.getMessage());
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         }
 
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, 
subName);
+        future.thenCompose(unused -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.RESET_CURSOR, subName))
+                .thenCompose(unused -> {

Review comment:
       ```suggestion
           future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
                   .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.RESET_CURSOR, subName))
                   .thenCompose(__ -> {
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1957,154 +1957,138 @@ private void 
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
     }
 
     protected void internalResetCursor(AsyncResponse asyncResponse, String 
subName, long timestamp,
-            boolean authoritative) {
+                                       boolean authoritative) {
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.warn("[{}][{}] Failed to reset cursor on subscription {} 
to time {}: {}",
-                        clientAppId(), topicName,
-                        subName, timestamp, e.getMessage());
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         }
 
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, 
subName);
+        future.thenCompose(unused -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.RESET_CURSOR, subName))
+                .thenCompose(unused -> {
+                    log.info("[{}] [{}] Received reset cursor on subscription 
{} to time {}",
+                            clientAppId(), topicName, subName, timestamp);
+                    // If the topic name is a partition name, no need to get 
partition topic metadata again
+                    if (topicName.isPartitioned()) {
+                        return 
internalResetCursorForNonPartitionedTopicAsync(subName, timestamp);
+                    } else {
+                        return 
internalResetCursorForPartitionedTopicAsync(subName, timestamp, authoritative);
+                    }
+                }).whenComplete((unc, ex) -> {
+                    if (ex == null) {
+                        log.info("[{}][{}] Reset cursor on subscription {} to 
time {}", clientAppId(), topicName,
+                                subName, timestamp);
+                        asyncResponse.resume(Response.noContent().build());
+                        return;
+                    }
 
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            internalResetCursorForNonPartitionedTopic(asyncResponse, subName, 
timestamp, authoritative);
-        } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
-                final int numPartitions = partitionMetadata.partitions;
-                if (numPartitions > 0) {
-                    final CompletableFuture<Void> future = new 
CompletableFuture<>();
-                    final AtomicInteger count = new 
AtomicInteger(numPartitions);
-                    final AtomicInteger failureCount = new AtomicInteger(0);
-                    final AtomicReference<Throwable> partitionException = new 
AtomicReference<>();
+                    Throwable cause = (ex instanceof CompletionException ? 
ex.getCause() : ex);

Review comment:
       FutureUtil.unwrapCompletionException()

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1957,154 +1957,138 @@ private void 
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
     }
 
     protected void internalResetCursor(AsyncResponse asyncResponse, String 
subName, long timestamp,
-            boolean authoritative) {
+                                       boolean authoritative) {
+        CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.warn("[{}][{}] Failed to reset cursor on subscription {} 
to time {}: {}",
-                        clientAppId(), topicName,
-                        subName, timestamp, e.getMessage());
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
         }
 
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, 
subName);
+        future.thenCompose(unused -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.RESET_CURSOR, subName))
+                .thenCompose(unused -> {
+                    log.info("[{}] [{}] Received reset cursor on subscription 
{} to time {}",
+                            clientAppId(), topicName, subName, timestamp);
+                    // If the topic name is a partition name, no need to get 
partition topic metadata again
+                    if (topicName.isPartitioned()) {
+                        return 
internalResetCursorForNonPartitionedTopicAsync(subName, timestamp);
+                    } else {
+                        return 
internalResetCursorForPartitionedTopicAsync(subName, timestamp, authoritative);
+                    }
+                }).whenComplete((unc, ex) -> {
+                    if (ex == null) {
+                        log.info("[{}][{}] Reset cursor on subscription {} to 
time {}", clientAppId(), topicName,
+                                subName, timestamp);
+                        asyncResponse.resume(Response.noContent().build());
+                        return;
+                    }
 
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            internalResetCursorForNonPartitionedTopic(asyncResponse, subName, 
timestamp, authoritative);
-        } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
-                final int numPartitions = partitionMetadata.partitions;
-                if (numPartitions > 0) {
-                    final CompletableFuture<Void> future = new 
CompletableFuture<>();
-                    final AtomicInteger count = new 
AtomicInteger(numPartitions);
-                    final AtomicInteger failureCount = new AtomicInteger(0);
-                    final AtomicReference<Throwable> partitionException = new 
AtomicReference<>();
+                    Throwable cause = (ex instanceof CompletionException ? 
ex.getCause() : ex);
+                    log.error("[{}] [{}] Failed to reset cursor on 
subscription {} to time {}", clientAppId(),
+                            topicName, subName, timestamp, cause);
+
+                    if (cause instanceof SubscriptionInvalidCursorPosition) {
+                        asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                                "Unable to find position for timestamp 
specified: " + cause.getMessage()));
+                    } else if (cause instanceof SubscriptionBusyException) {
+                        asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                                "Failed for Subscription Busy: " + 
cause.getMessage()));
+                    } else if (cause instanceof NotAllowedException) {
+                        asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED, cause.getMessage()));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, cause);
+                    }
+                });
+    }
 
-                    for (int i = 0; i < numPartitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
-                        try {
-                            pulsar().getAdminClient().topics()
-                                    
.resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) 
-> {
-                                if (ex != null) {
-                                    if (ex instanceof 
PreconditionFailedException) {
-                                        // throw the last exception if all 
partitions get this error
-                                        // any other exception on partition is 
reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset 
cursor on subscription {} to time {}",
-                                                clientAppId(), 
topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
-                                    }
-                                }
+    private CompletableFuture<Void> 
internalResetCursorForPartitionedTopicAsync(String subName, long timestamp,
+                                                                               
 boolean authoritative) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
+                .thenCompose(partitionMetadata -> {
+                    final int numPartitions = partitionMetadata.partitions;
+                    if (numPartitions > 0) {
+                        final CompletableFuture<Void> result = new 
CompletableFuture<>();
+                        final AtomicInteger count = new 
AtomicInteger(numPartitions);
+                        final AtomicInteger failureCount = new 
AtomicInteger(0);
+                        final AtomicReference<Throwable> partitionException = 
new AtomicReference<>();

Review comment:
       Please check here 
https://github.com/apache/pulsar/pull/13805/files#diff-66aeb65a64cbe7c541f013ae807c5bcbeab567bef77706c7ff2e0cbfe0d77eb1R623




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to