This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new e15a51ff3d6 [improve][broker][PIP-149]Make resetCursor async (#16355) 
(#16774)
e15a51ff3d6 is described below

commit e15a51ff3d618fc3e60d9fd9d3baac173597ef3f
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Jul 26 12:28:08 2022 +0800

    [improve][broker][PIP-149]Make resetCursor async (#16355) (#16774)
    
    (cherry picked from commit ea417fb494e23637beaaa796f515b12b4982959d)
    Signed-off-by: Zixuan Liu <[email protected]>
    
    Co-authored-by: Xiaoyu Hou <[email protected]>
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 174 ++++++++-------------
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  29 +++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  29 +++-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |   1 +
 4 files changed, 105 insertions(+), 128 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9c553f273ef..e677746bfeb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2018,29 +2018,31 @@ public class PersistentTopicsBase extends AdminResource 
{
         });
     }
 
-    protected void internalResetCursor(AsyncResponse asyncResponse, String 
subName, long timestamp,
+    protected CompletableFuture<Void> internalResetCursorAsync(String subName, 
long timestamp,
             boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.warn("[{}][{}] Failed to reset cursor on subscription {} 
to time {}: {}",
-                        clientAppId(), topicName,
-                        subName, timestamp, e.getMessage());
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
         }
+        return future
+            .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.RESET_CURSOR, subName))
+            .thenCompose(__ -> {
+                // If the topic name is a partition name, no need to get 
partition topic metadata again
+                if (topicName.isPartitioned()) {
+                    return internalResetCursorForNonPartitionedTopic(subName, 
timestamp, authoritative);
+                } else {
+                    return internalResetCursorForPartitionedTopic(subName, 
timestamp, authoritative);
+                }
+            });
+    }
 
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, 
subName);
-
-        // If the topic name is a partition name, no need to get partition 
topic metadata again
-        if (topicName.isPartitioned()) {
-            internalResetCursorForNonPartitionedTopic(asyncResponse, subName, 
timestamp, authoritative);
-        } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
+    private CompletableFuture<Void> 
internalResetCursorForPartitionedTopic(String subName, long timestamp,
+                                                                           
boolean authoritative) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
+            .thenCompose(partitionMetadata -> {
                 final int numPartitions = partitionMetadata.partitions;
                 if (numPartitions > 0) {
                     final CompletableFuture<Void> future = new 
CompletableFuture<>();
@@ -2052,124 +2054,72 @@ public class PersistentTopicsBase extends 
AdminResource {
                         TopicName topicNamePartition = 
topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    
.resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) 
-> {
-                                if (ex != null) {
-                                    if (ex instanceof 
PreconditionFailedException) {
-                                        // throw the last exception if all 
partitions get this error
-                                        // any other exception on partition is 
reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset 
cursor on subscription {} to time {}",
+                                
.resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof 
PreconditionFailedException) {
+                                            // throw the last exception if all 
partitions get this error
+                                            // any other exception on 
partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to 
reset cursor on subscription {} to time {}",
                                                 clientAppId(), 
topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on 
subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
-                        if (ex != null) {
-                            if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new 
RestException((PulsarAdminException) ex));
-                                return;
-                            } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
-                            }
-                        }
-
+                    return future.whenComplete((r, ex) -> {
                         // report an error to user if unable to reset for all 
partitions
                         if (failureCount.get() == numPartitions) {
                             log.warn("[{}] [{}] Failed to reset cursor on 
subscription {} to time {}",
-                                    clientAppId(), topicName,
-                                    subName, timestamp, 
partitionException.get());
-                            asyncResponse.resume(
-                                    new 
RestException(Status.PRECONDITION_FAILED,
-                                            
partitionException.get().getMessage()));
-                            return;
+                                clientAppId(), topicName,
+                                subName, timestamp, partitionException.get());
+                            throw new 
RestException(Status.PRECONDITION_FAILED, 
partitionException.get().getMessage());
                         } else if (failureCount.get() > 0) {
                             log.warn("[{}] [{}] Partial errors for reset 
cursor on subscription {} to time {}",
-                                    clientAppId(), topicName, subName, 
timestamp, partitionException.get());
+                                clientAppId(), topicName, subName, timestamp, 
partitionException.get());
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
                     });
                 } else {
-                    internalResetCursorForNonPartitionedTopic(asyncResponse, 
subName, timestamp, authoritative);
-                }
-            }).exceptionally(ex -> {
-                // If the exception is not redirect exception we need to log 
it.
-                if (!isRedirectException(ex)) {
-                    log.error("[{}] Failed to expire messages for all 
subscription on topic {}",
-                            clientAppId(), topicName, ex);
+                    return internalResetCursorForNonPartitionedTopic(subName, 
timestamp, authoritative);
                 }
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
             });
-        }
     }
 
-    private void internalResetCursorForNonPartitionedTopic(AsyncResponse 
asyncResponse, String subName, long timestamp,
+    private CompletableFuture<Void> 
internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
                                        boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, 
subName);
-
-            log.info("[{}] [{}] Received reset cursor on subscription {} to 
time {}",
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.RESET_CURSOR, subName))
+            .thenCompose(__ -> {
+                log.info("[{}] [{}] Received reset cursor on subscription {} 
to time {}",
                     clientAppId(), topicName, subName, timestamp);
-
-            PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Topic not found"));
-                return;
-            }
-            PersistentSubscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                return;
-            }
-            sub.resetCursor(timestamp).thenRun(() -> {
-                log.info("[{}][{}] Reset cursor on subscription {} to time 
{}", clientAppId(), topicName, subName,
-                        timestamp);
-                asyncResponse.resume(Response.noContent().build());
-            }).exceptionally(ex -> {
-                Throwable t = (ex instanceof CompletionException ? 
ex.getCause() : ex);
-                log.warn("[{}][{}] Failed to reset cursor on subscription {} 
to time {}", clientAppId(), topicName,
-                        subName, timestamp, t);
-                if (t instanceof SubscriptionInvalidCursorPosition) {
-                    asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                            "Unable to find position for timestamp specified: 
" + t.getMessage()));
-                } else if (t instanceof SubscriptionBusyException) {
-                    asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                            "Failed for Subscription Busy: " + 
t.getMessage()));
-                } else {
-                    resumeAsyncResponseExceptionally(asyncResponse, t);
+                return getTopicReferenceAsync(topicName);
+            })
+            .thenCompose(topic -> {
+                Subscription sub = topic.getSubscription(subName);
+                if (sub == null) {
+                   throw new RestException(Status.NOT_FOUND, "Subscription not 
found");
                 }
-                return null;
-            });
-        } catch (Exception e) {
-            log.warn("[{}][{}] Failed to reset cursor on subscription {} to 
time {}",
-                    clientAppId(), topicName, subName, timestamp, e);
-            if (e instanceof NotAllowedException) {
-                asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));
-            } else {
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-            }
-        }
+                return sub.resetCursor(timestamp);
+            })
+            .thenRun(() ->
+                log.info("[{}][{}] Reset cursor on subscription {} to time {}",
+                    clientAppId(), topicName, subName, timestamp));
     }
 
     protected void internalCreateSubscription(AsyncResponse asyncResponse, 
String subscriptionName,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index c37ce8186c9..a29bd3f9845 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -44,6 +44,7 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
@@ -54,6 +55,7 @@ import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -601,14 +603,25 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic, 
@PathParam("subName") String encodedSubName,
             @PathParam("timestamp") long timestamp,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        try {
-            validateTopicName(property, cluster, namespace, encodedTopic);
-            internalResetCursor(asyncResponse, decode(encodedSubName), 
timestamp, authoritative);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateTopicName(property, cluster, namespace, encodedTopic);
+        internalResetCursorAsync(decode(encodedSubName), timestamp, 
authoritative)
+            .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                Throwable t = FutureUtil.unwrapCompletionException(ex);
+                if (!isRedirectException(t)) {
+                    log.error("[{}][{}] Failed to reset cursor on subscription 
{} to time {}",
+                        clientAppId(), topicName, encodedSubName, timestamp, 
t);
+                }
+                if (t instanceof 
BrokerServiceException.SubscriptionInvalidCursorPosition) {
+                    t = new RestException(Response.Status.PRECONDITION_FAILED,
+                        "Unable to find position for timestamp specified: " + 
t.getMessage());
+                } else if (t instanceof 
BrokerServiceException.SubscriptionBusyException) {
+                    t = new RestException(Response.Status.PRECONDITION_FAILED,
+                        "Failed for Subscription Busy: " + t.getMessage());
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, t);
+                return null;
+            });
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index c871446b58a..5d23760de79 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -46,6 +46,7 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
@@ -71,6 +72,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1495,14 +1497,25 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("timestamp") long timestamp,
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        try {
-            validateTopicName(tenant, namespace, encodedTopic);
-            internalResetCursor(asyncResponse, decode(encodedSubName), 
timestamp, authoritative);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalResetCursorAsync(decode(encodedSubName), timestamp, 
authoritative)
+            .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                Throwable t = FutureUtil.unwrapCompletionException(ex);
+                if (!isRedirectException(t)) {
+                    log.error("[{}][{}] Failed to reset cursor on subscription 
{} to time {}",
+                        clientAppId(), topicName, encodedSubName, timestamp, 
t);
+                }
+                if (t instanceof 
BrokerServiceException.SubscriptionInvalidCursorPosition) {
+                    t = new RestException(Response.Status.PRECONDITION_FAILED,
+                        "Unable to find position for timestamp specified: " + 
t.getMessage());
+                } else if (t instanceof 
BrokerServiceException.SubscriptionBusyException) {
+                    t = new RestException(Response.Status.PRECONDITION_FAILED,
+                        "Failed for Subscription Busy: " + t.getMessage());
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, t);
+                return null;
+            });
     }
 
     @PUT
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 4ccfc08e9b5..5dc6a649406 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1244,6 +1244,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         doReturn(brokerService).when(pulsar).getBrokerService();
         CompletableFuture<Optional<Topic>> completableFuture = new 
CompletableFuture<>();
         
doReturn(completableFuture).when(brokerService).getTopicIfExists(topic);
+        completableFuture.completeExceptionally(new 
RuntimeException("TimeoutException"));
         try {
             admin.topics().resetCursor(topic, "my-sub", 
System.currentTimeMillis());
             Assert.fail();

Reply via email to