codelipenghui commented on a change in pull request #13841:
URL: https://github.com/apache/pulsar/pull/13841#discussion_r792323757
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1957,154 +1957,138 @@ private void
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
}
protected void internalResetCursor(AsyncResponse asyncResponse, String
subName, long timestamp,
- boolean authoritative) {
+ boolean authoritative) {
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {}
to time {}: {}",
- clientAppId(), topicName,
- subName, timestamp, e.getMessage());
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
Review comment:
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1957,154 +1957,138 @@ private void
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
}
protected void internalResetCursor(AsyncResponse asyncResponse, String
subName, long timestamp,
- boolean authoritative) {
+ boolean authoritative) {
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {}
to time {}: {}",
- clientAppId(), topicName,
- subName, timestamp, e.getMessage());
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
}
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.RESET_CURSOR,
subName);
+ future.thenCompose(unused -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(unused -> validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName))
+ .thenCompose(unused -> {
Review comment:
```suggestion
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName))
.thenCompose(__ -> {
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1957,154 +1957,138 @@ private void
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
}
protected void internalResetCursor(AsyncResponse asyncResponse, String
subName, long timestamp,
- boolean authoritative) {
+ boolean authoritative) {
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {}
to time {}: {}",
- clientAppId(), topicName,
- subName, timestamp, e.getMessage());
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
}
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.RESET_CURSOR,
subName);
+ future.thenCompose(unused -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(unused -> validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName))
+ .thenCompose(unused -> {
+ log.info("[{}] [{}] Received reset cursor on subscription
{} to time {}",
+ clientAppId(), topicName, subName, timestamp);
+ // If the topic name is a partition name, no need to get
partition topic metadata again
+ if (topicName.isPartitioned()) {
+ return
internalResetCursorForNonPartitionedTopicAsync(subName, timestamp);
+ } else {
+ return
internalResetCursorForPartitionedTopicAsync(subName, timestamp, authoritative);
+ }
+ }).whenComplete((unc, ex) -> {
+ if (ex == null) {
+ log.info("[{}][{}] Reset cursor on subscription {} to
time {}", clientAppId(), topicName,
+ subName, timestamp);
+ asyncResponse.resume(Response.noContent().build());
+ return;
+ }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- internalResetCursorForNonPartitionedTopic(asyncResponse, subName,
timestamp, authoritative);
- } else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
- final int numPartitions = partitionMetadata.partitions;
- if (numPartitions > 0) {
- final CompletableFuture<Void> future = new
CompletableFuture<>();
- final AtomicInteger count = new
AtomicInteger(numPartitions);
- final AtomicInteger failureCount = new AtomicInteger(0);
- final AtomicReference<Throwable> partitionException = new
AtomicReference<>();
+ Throwable cause = (ex instanceof CompletionException ?
ex.getCause() : ex);
Review comment:
FutureUtil.unwrapCompletionException()
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1957,154 +1957,138 @@ private void
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
}
protected void internalResetCursor(AsyncResponse asyncResponse, String
subName, long timestamp,
- boolean authoritative) {
+ boolean authoritative) {
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {}
to time {}: {}",
- clientAppId(), topicName,
- subName, timestamp, e.getMessage());
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
}
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.RESET_CURSOR,
subName);
+ future.thenCompose(unused -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(unused -> validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName))
+ .thenCompose(unused -> {
+ log.info("[{}] [{}] Received reset cursor on subscription
{} to time {}",
+ clientAppId(), topicName, subName, timestamp);
+ // If the topic name is a partition name, no need to get
partition topic metadata again
+ if (topicName.isPartitioned()) {
+ return
internalResetCursorForNonPartitionedTopicAsync(subName, timestamp);
+ } else {
+ return
internalResetCursorForPartitionedTopicAsync(subName, timestamp, authoritative);
+ }
+ }).whenComplete((unc, ex) -> {
+ if (ex == null) {
+ log.info("[{}][{}] Reset cursor on subscription {} to
time {}", clientAppId(), topicName,
+ subName, timestamp);
+ asyncResponse.resume(Response.noContent().build());
+ return;
+ }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- internalResetCursorForNonPartitionedTopic(asyncResponse, subName,
timestamp, authoritative);
- } else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
- final int numPartitions = partitionMetadata.partitions;
- if (numPartitions > 0) {
- final CompletableFuture<Void> future = new
CompletableFuture<>();
- final AtomicInteger count = new
AtomicInteger(numPartitions);
- final AtomicInteger failureCount = new AtomicInteger(0);
- final AtomicReference<Throwable> partitionException = new
AtomicReference<>();
+ Throwable cause = (ex instanceof CompletionException ?
ex.getCause() : ex);
+ log.error("[{}] [{}] Failed to reset cursor on
subscription {} to time {}", clientAppId(),
+ topicName, subName, timestamp, cause);
+
+ if (cause instanceof SubscriptionInvalidCursorPosition) {
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Unable to find position for timestamp
specified: " + cause.getMessage()));
+ } else if (cause instanceof SubscriptionBusyException) {
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Failed for Subscription Busy: " +
cause.getMessage()));
+ } else if (cause instanceof NotAllowedException) {
+ asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED, cause.getMessage()));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ }
+ });
+ }
- for (int i = 0; i < numPartitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
- pulsar().getAdminClient().topics()
-
.resetCursorAsync(topicNamePartition.toString(),
- subName, timestamp).handle((r, ex)
-> {
- if (ex != null) {
- if (ex instanceof
PreconditionFailedException) {
- // throw the last exception if all
partitions get this error
- // any other exception on partition is
reported back to user
- failureCount.incrementAndGet();
- partitionException.set(ex);
- } else {
- log.warn("[{}] [{}] Failed to reset
cursor on subscription {} to time {}",
- clientAppId(),
topicNamePartition, subName, timestamp, ex);
- future.completeExceptionally(ex);
- return null;
- }
- }
+ private CompletableFuture<Void>
internalResetCursorForPartitionedTopicAsync(String subName, long timestamp,
+
boolean authoritative) {
+ return getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenCompose(partitionMetadata -> {
+ final int numPartitions = partitionMetadata.partitions;
+ if (numPartitions > 0) {
+ final CompletableFuture<Void> result = new
CompletableFuture<>();
+ final AtomicInteger count = new
AtomicInteger(numPartitions);
+ final AtomicInteger failureCount = new
AtomicInteger(0);
+ final AtomicReference<Throwable> partitionException =
new AtomicReference<>();
Review comment:
Please check here
https://github.com/apache/pulsar/pull/13805/files#diff-66aeb65a64cbe7c541f013ae807c5bcbeab567bef77706c7ff2e0cbfe0d77eb1R623
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]