This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ea417fb494e [improve][broker][PIP-149]Make resetCursor async (#16355)
ea417fb494e is described below
commit ea417fb494e23637beaaa796f515b12b4982959d
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Wed Jul 13 17:33:51 2022 +0800
[improve][broker][PIP-149]Make resetCursor async (#16355)
---
.../broker/admin/impl/PersistentTopicsBase.java | 177 ++++++++-------------
.../pulsar/broker/admin/v1/PersistentTopics.java | 27 +++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 27 +++-
.../pulsar/broker/admin/PersistentTopicsTest.java | 1 +
4 files changed, 102 insertions(+), 130 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 529294ef437..3782cf1ab43 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2036,29 +2036,31 @@ public class PersistentTopicsBase extends AdminResource
{
});
}
- protected void internalResetCursor(AsyncResponse asyncResponse, String
subName, long timestamp,
+ protected CompletableFuture<Void> internalResetCursorAsync(String subName,
long timestamp,
boolean authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {}
to time {}: {}",
- clientAppId(), topicName,
- subName, timestamp, e.getMessage());
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
}
+ return future
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName))
+ .thenCompose(__ -> {
+ // If the topic name is a partition name, no need to get
partition topic metadata again
+ if (topicName.isPartitioned()) {
+ return internalResetCursorForNonPartitionedTopic(subName,
timestamp, authoritative);
+ } else {
+ return internalResetCursorForPartitionedTopic(subName,
timestamp, authoritative);
+ }
+ });
+ }
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.RESET_CURSOR,
subName);
-
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- internalResetCursorForNonPartitionedTopic(asyncResponse, subName,
timestamp, authoritative);
- } else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
+ private CompletableFuture<Void>
internalResetCursorForPartitionedTopic(String subName, long timestamp,
+
boolean authoritative) {
+ return getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenCompose(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new
CompletableFuture<>();
@@ -2070,126 +2072,73 @@ public class PersistentTopicsBase extends
AdminResource {
TopicName topicNamePartition =
topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
-
.resetCursorAsync(topicNamePartition.toString(),
- subName, timestamp).handle((r, ex)
-> {
- if (ex != null) {
- if (ex instanceof
PreconditionFailedException) {
- // throw the last exception if all
partitions get this error
- // any other exception on partition is
reported back to user
- failureCount.incrementAndGet();
- partitionException.set(ex);
- } else {
- log.warn("[{}] [{}] Failed to reset
cursor on subscription {} to time {}",
+
.resetCursorAsync(topicNamePartition.toString(),
+ subName, timestamp).handle((r, ex) -> {
+ if (ex != null) {
+ if (ex instanceof
PreconditionFailedException) {
+ // throw the last exception if all
partitions get this error
+ // any other exception on
partition is reported back to user
+ failureCount.incrementAndGet();
+ partitionException.set(ex);
+ } else {
+ log.warn("[{}] [{}] Failed to
reset cursor on subscription {} to time {}",
clientAppId(),
topicNamePartition, subName, timestamp, ex);
- future.completeExceptionally(ex);
- return null;
+ future.completeExceptionally(ex);
+ return null;
+ }
}
- }
- if (count.decrementAndGet() == 0) {
- future.complete(null);
- }
+ if (count.decrementAndGet() == 0) {
+ future.complete(null);
+ }
- return null;
- });
+ return null;
+ });
} catch (Exception e) {
log.warn("[{}] [{}] Failed to reset cursor on
subscription {} to time {}", clientAppId(),
- topicNamePartition, subName, timestamp, e);
+ topicNamePartition, subName, timestamp, e);
future.completeExceptionally(e);
}
}
- future.whenComplete((r, ex) -> {
- if (ex != null) {
- if (ex instanceof PulsarAdminException) {
- asyncResponse.resume(new
RestException((PulsarAdminException) ex));
- return;
- } else {
- asyncResponse.resume(new RestException(ex));
- return;
- }
- }
-
+ return future.whenComplete((r, ex) -> {
// report an error to user if unable to reset for all
partitions
if (failureCount.get() == numPartitions) {
log.warn("[{}] [{}] Failed to reset cursor on
subscription {} to time {}",
- clientAppId(), topicName,
- subName, timestamp,
partitionException.get());
- asyncResponse.resume(
- new
RestException(Status.PRECONDITION_FAILED,
-
partitionException.get().getMessage()));
- return;
+ clientAppId(), topicName,
+ subName, timestamp, partitionException.get());
+ throw new
RestException(Status.PRECONDITION_FAILED,
partitionException.get().getMessage());
} else if (failureCount.get() > 0) {
log.warn("[{}] [{}] Partial errors for reset
cursor on subscription {} to time {}",
- clientAppId(), topicName, subName,
timestamp, partitionException.get());
+ clientAppId(), topicName, subName, timestamp,
partitionException.get());
}
-
- asyncResponse.resume(Response.noContent().build());
});
} else {
- internalResetCursorForNonPartitionedTopic(asyncResponse,
subName, timestamp, authoritative);
+ return internalResetCursorForNonPartitionedTopic(subName,
timestamp, authoritative);
}
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to log
it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to expire messages for all
subscription on topic {}",
- clientAppId(), topicName, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
});
- }
}
- private void internalResetCursorForNonPartitionedTopic(AsyncResponse
asyncResponse, String subName, long timestamp,
+ private CompletableFuture<Void>
internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
boolean authoritative) {
- try {
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.RESET_CURSOR,
subName);
-
- log.info("[{}] [{}] Received reset cursor on subscription {} to
time {}",
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName))
+ .thenCompose(__ -> {
+ log.info("[{}] [{}] Received reset cursor on subscription {}
to time {}",
clientAppId(), topicName, subName, timestamp);
-
- PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
- if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
- getTopicNotFoundErrorMessage(topicName.toString())));
- return;
- }
- PersistentSubscription sub = topic.getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
- getSubNotFoundErrorMessage(topicName.toString(),
subName)));
- return;
- }
- sub.resetCursor(timestamp).thenRun(() -> {
- log.info("[{}][{}] Reset cursor on subscription {} to time
{}", clientAppId(), topicName, subName,
- timestamp);
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- Throwable t = (ex instanceof CompletionException ?
ex.getCause() : ex);
- log.warn("[{}][{}] Failed to reset cursor on subscription {}
to time {}", clientAppId(), topicName,
- subName, timestamp, t);
- if (t instanceof SubscriptionInvalidCursorPosition) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Unable to find position for timestamp specified:
" + t.getMessage()));
- } else if (t instanceof SubscriptionBusyException) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Failed for Subscription Busy: " +
t.getMessage()));
- } else {
- resumeAsyncResponseExceptionally(asyncResponse, t);
+ return getTopicReferenceAsync(topicName);
+ })
+ .thenCompose(topic -> {
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ throw new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(),
subName));
}
- return null;
- });
- } catch (Exception e) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {} to
time {}",
- clientAppId(), topicName, subName, timestamp, e);
- if (e instanceof NotAllowedException) {
- asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));
- } else {
- resumeAsyncResponseExceptionally(asyncResponse, e);
- }
- }
+ return sub.resetCursor(timestamp);
+ })
+ .thenRun(() ->
+ log.info("[{}][{}] Reset cursor on subscription {} to time {}",
+ clientAppId(), topicName, subName, timestamp));
}
protected void internalCreateSubscription(AsyncResponse asyncResponse,
String subscriptionName,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 5e5c4ab0d5f..51a0cb1598d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -696,14 +696,25 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("subName") String encodedSubName,
@PathParam("timestamp") long timestamp,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- try {
- validateTopicName(property, cluster, namespace, encodedTopic);
- internalResetCursor(asyncResponse, decode(encodedSubName),
timestamp, authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalResetCursorAsync(decode(encodedSubName), timestamp,
authoritative)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ if (!isRedirectException(t)) {
+ log.error("[{}][{}] Failed to reset cursor on subscription
{} to time {}",
+ clientAppId(), topicName, encodedSubName, timestamp,
t);
+ }
+ if (t instanceof
BrokerServiceException.SubscriptionInvalidCursorPosition) {
+ t = new RestException(Response.Status.PRECONDITION_FAILED,
+ "Unable to find position for timestamp specified: " +
t.getMessage());
+ } else if (t instanceof
BrokerServiceException.SubscriptionBusyException) {
+ t = new RestException(Response.Status.PRECONDITION_FAILED,
+ "Failed for Subscription Busy: " + t.getMessage());
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, t);
+ return null;
+ });
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index edf49d410ad..28dc8015ff8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1589,14 +1589,25 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("timestamp") long timestamp,
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- try {
- validateTopicName(tenant, namespace, encodedTopic);
- internalResetCursor(asyncResponse, decode(encodedSubName),
timestamp, authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalResetCursorAsync(decode(encodedSubName), timestamp,
authoritative)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ if (!isRedirectException(t)) {
+ log.error("[{}][{}] Failed to reset cursor on subscription
{} to time {}",
+ clientAppId(), topicName, encodedSubName, timestamp,
t);
+ }
+ if (t instanceof
BrokerServiceException.SubscriptionInvalidCursorPosition) {
+ t = new RestException(Response.Status.PRECONDITION_FAILED,
+ "Unable to find position for timestamp specified: " +
t.getMessage());
+ } else if (t instanceof
BrokerServiceException.SubscriptionBusyException) {
+ t = new RestException(Response.Status.PRECONDITION_FAILED,
+ "Failed for Subscription Busy: " + t.getMessage());
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, t);
+ return null;
+ });
}
@PUT
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index fdda0bc5745..9fe28655594 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1445,6 +1445,7 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
doReturn(brokerService).when(pulsar).getBrokerService();
CompletableFuture<Optional<Topic>> completableFuture = new
CompletableFuture<>();
doReturn(completableFuture).when(brokerService).getTopicIfExists(topic);
+ completableFuture.completeExceptionally(new
RuntimeException("TimeoutException"));
try {
admin.topics().resetCursor(topic, "my-sub",
System.currentTimeMillis());
Assert.fail();