Technoboy- commented on a change in pull request #13841:
URL: https://github.com/apache/pulsar/pull/13841#discussion_r794407092
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1957,154 +1958,135 @@ private void
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
}
protected void internalResetCursor(AsyncResponse asyncResponse, String
subName, long timestamp,
- boolean authoritative) {
+ boolean authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {}
to time {}: {}",
- clientAppId(), topicName,
- subName, timestamp, e.getMessage());
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
}
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.RESET_CURSOR,
subName);
+ future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName))
+ .thenCompose(__ -> {
+ log.info("[{}] [{}] Received reset cursor on subscription
{} to time {}",
+ clientAppId(), topicName, subName, timestamp);
+ // If the topic name is a partition name, no need to get
partition topic metadata again
+ if (topicName.isPartitioned()) {
+ return
internalResetCursorForNonPartitionedTopicAsync(subName, timestamp);
+ } else {
+ return
internalResetCursorForPartitionedTopicAsync(subName, timestamp, authoritative);
+ }
+ }).whenComplete((__, ex) -> {
+ if (ex == null) {
+ log.info("[{}][{}] Reset cursor on subscription {} to
time {}", clientAppId(), topicName,
+ subName, timestamp);
+ asyncResponse.resume(Response.noContent().build());
+ return;
+ }
+
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ log.error("[{}] [{}] Failed to reset cursor on
subscription {} to time {}", clientAppId(),
+ topicName, subName, timestamp, cause);
+
+ if (cause instanceof SubscriptionInvalidCursorPosition) {
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Unable to find position for timestamp
specified: " + cause.getMessage()));
+ } else if (cause instanceof SubscriptionBusyException) {
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Failed for Subscription Busy: " +
cause.getMessage()));
+ } else if (cause instanceof NotAllowedException) {
+ asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED, cause.getMessage()));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ }
+ });
+ }
+
+ private CompletableFuture<Void>
internalResetCursorForPartitionedTopicAsync(String subName, long timestamp,
+
boolean authoritative) {
+ return getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenCompose(partitionMetadata -> {
+ final int numPartitions = partitionMetadata.partitions;
+ if (numPartitions <= 0) {
+ return
internalResetCursorForNonPartitionedTopicAsync(subName, timestamp);
+ }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- internalResetCursorForNonPartitionedTopic(asyncResponse, subName,
timestamp, authoritative);
- } else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
- final int numPartitions = partitionMetadata.partitions;
- if (numPartitions > 0) {
- final CompletableFuture<Void> future = new
CompletableFuture<>();
- final AtomicInteger count = new
AtomicInteger(numPartitions);
final AtomicInteger failureCount = new AtomicInteger(0);
final AtomicReference<Throwable> partitionException = new
AtomicReference<>();
-
- for (int i = 0; i < numPartitions; i++) {
+ final List<CompletableFuture<?>> futures =
IntStream.range(0, numPartitions).mapToObj(i -> {
TopicName topicNamePartition =
topicName.getPartition(i);
try {
+ CompletableFuture<Void> future = new
CompletableFuture<>();
pulsar().getAdminClient().topics()
-
.resetCursorAsync(topicNamePartition.toString(),
- subName, timestamp).handle((r, ex)
-> {
- if (ex != null) {
- if (ex instanceof
PreconditionFailedException) {
- // throw the last exception if all
partitions get this error
- // any other exception on partition is
reported back to user
- failureCount.incrementAndGet();
- partitionException.set(ex);
- } else {
- log.warn("[{}] [{}] Failed to reset
cursor on subscription {} to time {}",
- clientAppId(),
topicNamePartition, subName, timestamp, ex);
- future.completeExceptionally(ex);
- return null;
- }
- }
-
- if (count.decrementAndGet() == 0) {
- future.complete(null);
- }
-
- return null;
- });
- } catch (Exception e) {
- log.warn("[{}] [{}] Failed to reset cursor on
subscription {} to time {}", clientAppId(),
- topicNamePartition, subName, timestamp, e);
- future.completeExceptionally(e);
- }
- }
+
.resetCursorAsync(topicNamePartition.toString(), subName, timestamp)
+ .handle((___, ex) -> {
+ if (ex != null) {
+ if (ex instanceof
PreconditionFailedException) {
+ // throw the last exception if
all partitions get
+ // this error
+ // any other exception on
partition is reported
+ // back to user
+ failureCount.incrementAndGet();
+ partitionException.set(ex);
+ } else {
+ log.warn("[{}] [{}] Failed to
reset cursor on subscription {} to time "
+ + "{}",
clientAppId(), topicNamePartition, subName,
+ timestamp, ex);
+
future.completeExceptionally(ex);
+ return null;
+ }
+ }
- future.whenComplete((r, ex) -> {
- if (ex != null) {
- if (ex instanceof PulsarAdminException) {
- asyncResponse.resume(new
RestException((PulsarAdminException) ex));
- return;
- } else {
- asyncResponse.resume(new RestException(ex));
- return;
- }
+ future.complete(null);
+ return null;
+ });
+ return future;
+ } catch (PulsarServerException ex) {
+ log.error("[{}] Failed to get admin client while
delete partition {}", clientAppId(),
+ topicNamePartition, ex);
+ return FutureUtil.failedFuture(ex);
}
+ }).collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures).thenCompose((__) -> {
// report an error to user if unable to reset for all
partitions
if (failureCount.get() == numPartitions) {
log.warn("[{}] [{}] Failed to reset cursor on
subscription {} to time {}",
- clientAppId(), topicName,
- subName, timestamp,
partitionException.get());
- asyncResponse.resume(
- new
RestException(Status.PRECONDITION_FAILED,
-
partitionException.get().getMessage()));
- return;
+ clientAppId(), topicName, subName,
timestamp, partitionException.get());
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ partitionException.get().getMessage()));
} else if (failureCount.get() > 0) {
log.warn("[{}] [{}] Partial errors for reset
cursor on subscription {} to time {}",
clientAppId(), topicName, subName,
timestamp, partitionException.get());
}
-
- asyncResponse.resume(Response.noContent().build());
+ return CompletableFuture.completedFuture(null);
});
- } else {
- internalResetCursorForNonPartitionedTopic(asyncResponse,
subName, timestamp, authoritative);
- }
- }).exceptionally(ex -> {
- log.error("[{}] Failed to expire messages for all subscription
on topic {}",
- clientAppId(), topicName, ex);
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
- }
+ });
}
- private void internalResetCursorForNonPartitionedTopic(AsyncResponse
asyncResponse, String subName, long timestamp,
- boolean authoritative) {
- try {
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.RESET_CURSOR,
subName);
Review comment:
Why remove these two lines(2064~2065)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]