This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new e15a51ff3d6 [improve][broker][PIP-149]Make resetCursor async (#16355)
(#16774)
e15a51ff3d6 is described below
commit e15a51ff3d618fc3e60d9fd9d3baac173597ef3f
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Jul 26 12:28:08 2022 +0800
[improve][broker][PIP-149]Make resetCursor async (#16355) (#16774)
(cherry picked from commit ea417fb494e23637beaaa796f515b12b4982959d)
Signed-off-by: Zixuan Liu <[email protected]>
Co-authored-by: Xiaoyu Hou <[email protected]>
---
.../broker/admin/impl/PersistentTopicsBase.java | 174 ++++++++-------------
.../pulsar/broker/admin/v1/PersistentTopics.java | 29 +++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 29 +++-
.../pulsar/broker/admin/PersistentTopicsTest.java | 1 +
4 files changed, 105 insertions(+), 128 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9c553f273ef..e677746bfeb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2018,29 +2018,31 @@ public class PersistentTopicsBase extends AdminResource
{
});
}
- protected void internalResetCursor(AsyncResponse asyncResponse, String
subName, long timestamp,
+ protected CompletableFuture<Void> internalResetCursorAsync(String subName,
long timestamp,
boolean authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {}
to time {}: {}",
- clientAppId(), topicName,
- subName, timestamp, e.getMessage());
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
}
+ return future
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName))
+ .thenCompose(__ -> {
+ // If the topic name is a partition name, no need to get
partition topic metadata again
+ if (topicName.isPartitioned()) {
+ return internalResetCursorForNonPartitionedTopic(subName,
timestamp, authoritative);
+ } else {
+ return internalResetCursorForPartitionedTopic(subName,
timestamp, authoritative);
+ }
+ });
+ }
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.RESET_CURSOR,
subName);
-
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- internalResetCursorForNonPartitionedTopic(asyncResponse, subName,
timestamp, authoritative);
- } else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
+ private CompletableFuture<Void>
internalResetCursorForPartitionedTopic(String subName, long timestamp,
+
boolean authoritative) {
+ return getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenCompose(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new
CompletableFuture<>();
@@ -2052,124 +2054,72 @@ public class PersistentTopicsBase extends
AdminResource {
TopicName topicNamePartition =
topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
-
.resetCursorAsync(topicNamePartition.toString(),
- subName, timestamp).handle((r, ex)
-> {
- if (ex != null) {
- if (ex instanceof
PreconditionFailedException) {
- // throw the last exception if all
partitions get this error
- // any other exception on partition is
reported back to user
- failureCount.incrementAndGet();
- partitionException.set(ex);
- } else {
- log.warn("[{}] [{}] Failed to reset
cursor on subscription {} to time {}",
+
.resetCursorAsync(topicNamePartition.toString(),
+ subName, timestamp).handle((r, ex) -> {
+ if (ex != null) {
+ if (ex instanceof
PreconditionFailedException) {
+ // throw the last exception if all
partitions get this error
+ // any other exception on
partition is reported back to user
+ failureCount.incrementAndGet();
+ partitionException.set(ex);
+ } else {
+ log.warn("[{}] [{}] Failed to
reset cursor on subscription {} to time {}",
clientAppId(),
topicNamePartition, subName, timestamp, ex);
- future.completeExceptionally(ex);
- return null;
+ future.completeExceptionally(ex);
+ return null;
+ }
}
- }
- if (count.decrementAndGet() == 0) {
- future.complete(null);
- }
+ if (count.decrementAndGet() == 0) {
+ future.complete(null);
+ }
- return null;
- });
+ return null;
+ });
} catch (Exception e) {
log.warn("[{}] [{}] Failed to reset cursor on
subscription {} to time {}", clientAppId(),
- topicNamePartition, subName, timestamp, e);
+ topicNamePartition, subName, timestamp, e);
future.completeExceptionally(e);
}
}
- future.whenComplete((r, ex) -> {
- if (ex != null) {
- if (ex instanceof PulsarAdminException) {
- asyncResponse.resume(new
RestException((PulsarAdminException) ex));
- return;
- } else {
- asyncResponse.resume(new RestException(ex));
- return;
- }
- }
-
+ return future.whenComplete((r, ex) -> {
// report an error to user if unable to reset for all
partitions
if (failureCount.get() == numPartitions) {
log.warn("[{}] [{}] Failed to reset cursor on
subscription {} to time {}",
- clientAppId(), topicName,
- subName, timestamp,
partitionException.get());
- asyncResponse.resume(
- new
RestException(Status.PRECONDITION_FAILED,
-
partitionException.get().getMessage()));
- return;
+ clientAppId(), topicName,
+ subName, timestamp, partitionException.get());
+ throw new
RestException(Status.PRECONDITION_FAILED,
partitionException.get().getMessage());
} else if (failureCount.get() > 0) {
log.warn("[{}] [{}] Partial errors for reset
cursor on subscription {} to time {}",
- clientAppId(), topicName, subName,
timestamp, partitionException.get());
+ clientAppId(), topicName, subName, timestamp,
partitionException.get());
}
-
- asyncResponse.resume(Response.noContent().build());
});
} else {
- internalResetCursorForNonPartitionedTopic(asyncResponse,
subName, timestamp, authoritative);
- }
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to log
it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to expire messages for all
subscription on topic {}",
- clientAppId(), topicName, ex);
+ return internalResetCursorForNonPartitionedTopic(subName,
timestamp, authoritative);
}
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
});
- }
}
- private void internalResetCursorForNonPartitionedTopic(AsyncResponse
asyncResponse, String subName, long timestamp,
+ private CompletableFuture<Void>
internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
boolean authoritative) {
- try {
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.RESET_CURSOR,
subName);
-
- log.info("[{}] [{}] Received reset cursor on subscription {} to
time {}",
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName))
+ .thenCompose(__ -> {
+ log.info("[{}] [{}] Received reset cursor on subscription {}
to time {}",
clientAppId(), topicName, subName, timestamp);
-
- PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
- if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Topic not found"));
- return;
- }
- PersistentSubscription sub = topic.getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
- return;
- }
- sub.resetCursor(timestamp).thenRun(() -> {
- log.info("[{}][{}] Reset cursor on subscription {} to time
{}", clientAppId(), topicName, subName,
- timestamp);
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- Throwable t = (ex instanceof CompletionException ?
ex.getCause() : ex);
- log.warn("[{}][{}] Failed to reset cursor on subscription {}
to time {}", clientAppId(), topicName,
- subName, timestamp, t);
- if (t instanceof SubscriptionInvalidCursorPosition) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Unable to find position for timestamp specified:
" + t.getMessage()));
- } else if (t instanceof SubscriptionBusyException) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Failed for Subscription Busy: " +
t.getMessage()));
- } else {
- resumeAsyncResponseExceptionally(asyncResponse, t);
+ return getTopicReferenceAsync(topicName);
+ })
+ .thenCompose(topic -> {
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ throw new RestException(Status.NOT_FOUND, "Subscription not
found");
}
- return null;
- });
- } catch (Exception e) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {} to
time {}",
- clientAppId(), topicName, subName, timestamp, e);
- if (e instanceof NotAllowedException) {
- asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));
- } else {
- resumeAsyncResponseExceptionally(asyncResponse, e);
- }
- }
+ return sub.resetCursor(timestamp);
+ })
+ .thenRun(() ->
+ log.info("[{}][{}] Reset cursor on subscription {} to time {}",
+ clientAppId(), topicName, subName, timestamp));
}
protected void internalCreateSubscription(AsyncResponse asyncResponse,
String subscriptionName,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index c37ce8186c9..a29bd3f9845 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -44,6 +44,7 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.api.MessageId;
@@ -54,6 +55,7 @@ import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -601,14 +603,25 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("subName") String encodedSubName,
@PathParam("timestamp") long timestamp,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- try {
- validateTopicName(property, cluster, namespace, encodedTopic);
- internalResetCursor(asyncResponse, decode(encodedSubName),
timestamp, authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalResetCursorAsync(decode(encodedSubName), timestamp,
authoritative)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ if (!isRedirectException(t)) {
+ log.error("[{}][{}] Failed to reset cursor on subscription
{} to time {}",
+ clientAppId(), topicName, encodedSubName, timestamp,
t);
+ }
+ if (t instanceof
BrokerServiceException.SubscriptionInvalidCursorPosition) {
+ t = new RestException(Response.Status.PRECONDITION_FAILED,
+ "Unable to find position for timestamp specified: " +
t.getMessage());
+ } else if (t instanceof
BrokerServiceException.SubscriptionBusyException) {
+ t = new RestException(Response.Status.PRECONDITION_FAILED,
+ "Failed for Subscription Busy: " + t.getMessage());
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, t);
+ return null;
+ });
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index c871446b58a..5d23760de79 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -46,6 +46,7 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.api.MessageId;
@@ -71,6 +72,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1495,14 +1497,25 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("timestamp") long timestamp,
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- try {
- validateTopicName(tenant, namespace, encodedTopic);
- internalResetCursor(asyncResponse, decode(encodedSubName),
timestamp, authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalResetCursorAsync(decode(encodedSubName), timestamp,
authoritative)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ if (!isRedirectException(t)) {
+ log.error("[{}][{}] Failed to reset cursor on subscription
{} to time {}",
+ clientAppId(), topicName, encodedSubName, timestamp,
t);
+ }
+ if (t instanceof
BrokerServiceException.SubscriptionInvalidCursorPosition) {
+ t = new RestException(Response.Status.PRECONDITION_FAILED,
+ "Unable to find position for timestamp specified: " +
t.getMessage());
+ } else if (t instanceof
BrokerServiceException.SubscriptionBusyException) {
+ t = new RestException(Response.Status.PRECONDITION_FAILED,
+ "Failed for Subscription Busy: " + t.getMessage());
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, t);
+ return null;
+ });
}
@PUT
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 4ccfc08e9b5..5dc6a649406 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1244,6 +1244,7 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
doReturn(brokerService).when(pulsar).getBrokerService();
CompletableFuture<Optional<Topic>> completableFuture = new
CompletableFuture<>();
doReturn(completableFuture).when(brokerService).getTopicIfExists(topic);
+ completableFuture.completeExceptionally(new
RuntimeException("TimeoutException"));
try {
admin.topics().resetCursor(topic, "my-sub",
System.currentTimeMillis());
Assert.fail();