Jason918 commented on a change in pull request #13901:
URL: https://github.com/apache/pulsar/pull/13901#discussion_r790120669
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1651,118 +1651,123 @@ private void
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes
}
protected void internalSkipAllMessages(AsyncResponse asyncResponse, String
subName, boolean authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to skip all messages for subscription
{} on topic {}",
- clientAppId(), subName, topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
}
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- internalSkipAllMessagesForNonPartitionedTopic(asyncResponse,
subName, authoritative);
- } else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ future.thenRun(() -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenRun(() -> validateTopicOperationAsync(topicName,
TopicOperation.SKIP, subName))
+ .thenRun(() -> {
+ // If the topic name is a partition name, no need to get
partition topic metadata again
+ if (topicName.isPartitioned()) {
+
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName,
authoritative);
+ } else {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
- futures.add(pulsar()
- .getAdminClient()
- .topics()
-
.skipAllMessagesAsync(topicNamePartition.toString(),
+ for (int i = 0; i < partitionMetadata.partitions;
i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+ futures.add(pulsar()
+ .getAdminClient()
+ .topics()
+
.skipAllMessagesAsync(topicNamePartition.toString(),
subName));
- } catch (Exception e) {
- log.error("[{}] Failed to skip all messages {} {}",
- clientAppId(), topicNamePartition,
subName, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
+ } catch (Exception e) {
+ log.error("[{}] Failed to skip all
messages {} {}",
+ clientAppId(), topicNamePartition,
subName, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
- FutureUtil.waitForAll(futures).handle((result, exception)
-> {
- if (exception != null) {
- Throwable t = exception.getCause();
- if (t instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Subscription not found"));
- return null;
- } else {
- log.error("[{}] Failed to skip all messages {}
{}",
- clientAppId(), topicName, subName, t);
- asyncResponse.resume(new RestException(t));
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse.resume(
+ new
RestException(Status.NOT_FOUND, "Subscription not found"));
+ return null;
+ } else {
+ log.error("[{}] Failed to skip all
messages {} {}",
+ clientAppId(), topicName, subName,
t);
+ asyncResponse.resume(new
RestException(t));
+ return null;
+ }
+ }
+
+
asyncResponse.resume(Response.noContent().build());
return null;
- }
+ });
+ } else {
+
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName,
authoritative);
}
-
- asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to skip all messages for
subscription {} on topic {}",
+ clientAppId(), subName, topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
- } else {
-
internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName,
authoritative);
}
}).exceptionally(ex -> {
log.error("[{}] Failed to skip all messages for subscription
{} on topic {}",
- clientAppId(), subName, topicName, ex);
+ clientAppId(), subName, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
- }
}
- private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse
asyncResponse,
+ private void
internalSkipAllMessagesForNonPartitionedTopicAsync(AsyncResponse asyncResponse,
String subName,
boolean authoritative) {
- try {
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
- PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
- BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
- if (ex != null) {
- asyncResponse.resume(new RestException(ex));
- log.error("[{}] Failed to skip all messages {} {}",
clientAppId(), topicName, subName, ex);
- } else {
- asyncResponse.resume(Response.noContent().build());
- log.info("[{}] Cleared backlog on {} {}", clientAppId(),
topicName, subName);
- }
- };
- if (subName.startsWith(topic.getReplicatorPrefix())) {
- String remoteCluster =
PersistentReplicator.getRemoteCluster(subName);
- PersistentReplicator repl = (PersistentReplicator)
topic.getPersistentReplicator(remoteCluster);
- if (repl == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
- return;
- }
- repl.clearBacklog().whenComplete(biConsumer);
- } else {
- PersistentSubscription sub = topic.getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
- return;
- }
- sub.clearBacklog().whenComplete(biConsumer);
- }
- } catch (WebApplicationException wae) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Failed to skip all messages for subscription
on topic {},"
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenRun(() ->
+ validateTopicOperationAsync(topicName, TopicOperation.SKIP,
subName))
+ .thenRun(() -> {
+ try {
+ PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
+ BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
+ if (ex != null) {
+ asyncResponse.resume(new RestException(ex));
+ log.error("[{}] Failed to skip all messages {}
{}", clientAppId(), topicName, subName, ex);
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ log.info("[{}] Cleared backlog on {} {}",
clientAppId(), topicName, subName);
+ }
+ };
+ if (subName.startsWith(topic.getReplicatorPrefix())) {
+ String remoteCluster =
PersistentReplicator.getRemoteCluster(subName);
+ PersistentReplicator repl = (PersistentReplicator)
topic.getPersistentReplicator(remoteCluster);
+ if (repl == null) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Subscription not found"));
+ return;
+ }
+ repl.clearBacklog().whenComplete(biConsumer);
+ } else {
+ PersistentSubscription sub =
topic.getSubscription(subName);
+ if (sub == null) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Subscription not found"));
+ return;
+ }
+ sub.clearBacklog().whenComplete(biConsumer);
+ }
+ } catch (WebApplicationException wae) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to skip all messages for
subscription on topic {},"
+ " redirecting to other brokers.",
- clientAppId(), topicName, wae);
- }
- resumeAsyncResponseExceptionally(asyncResponse, wae);
- } catch (Exception e) {
- log.error("[{}] Failed to skip all messages for subscription {} on
topic {}",
- clientAppId(), subName, topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- }
+ clientAppId(), topicName, wae);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, wae);
+ } catch (Exception e) {
+ log.error("[{}] Failed to skip all messages for
subscription {} on topic {}",
+ clientAppId(), subName, topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ });
Review comment:
Need to handle exception with `exceptionally` here.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1651,118 +1651,123 @@ private void
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes
}
protected void internalSkipAllMessages(AsyncResponse asyncResponse, String
subName, boolean authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to skip all messages for subscription
{} on topic {}",
- clientAppId(), subName, topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
}
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- internalSkipAllMessagesForNonPartitionedTopic(asyncResponse,
subName, authoritative);
- } else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ future.thenRun(() -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenRun(() -> validateTopicOperationAsync(topicName,
TopicOperation.SKIP, subName))
+ .thenRun(() -> {
+ // If the topic name is a partition name, no need to get
partition topic metadata again
+ if (topicName.isPartitioned()) {
+
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName,
authoritative);
+ } else {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
- futures.add(pulsar()
- .getAdminClient()
- .topics()
-
.skipAllMessagesAsync(topicNamePartition.toString(),
+ for (int i = 0; i < partitionMetadata.partitions;
i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+ futures.add(pulsar()
+ .getAdminClient()
+ .topics()
+
.skipAllMessagesAsync(topicNamePartition.toString(),
subName));
- } catch (Exception e) {
- log.error("[{}] Failed to skip all messages {} {}",
- clientAppId(), topicNamePartition,
subName, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
+ } catch (Exception e) {
+ log.error("[{}] Failed to skip all
messages {} {}",
+ clientAppId(), topicNamePartition,
subName, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
- FutureUtil.waitForAll(futures).handle((result, exception)
-> {
- if (exception != null) {
- Throwable t = exception.getCause();
- if (t instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Subscription not found"));
- return null;
- } else {
- log.error("[{}] Failed to skip all messages {}
{}",
- clientAppId(), topicName, subName, t);
- asyncResponse.resume(new RestException(t));
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse.resume(
+ new
RestException(Status.NOT_FOUND, "Subscription not found"));
+ return null;
+ } else {
+ log.error("[{}] Failed to skip all
messages {} {}",
+ clientAppId(), topicName, subName,
t);
+ asyncResponse.resume(new
RestException(t));
+ return null;
+ }
+ }
+
+
asyncResponse.resume(Response.noContent().build());
return null;
- }
+ });
+ } else {
+
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName,
authoritative);
}
-
- asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to skip all messages for
subscription {} on topic {}",
+ clientAppId(), subName, topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
- } else {
-
internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName,
authoritative);
}
}).exceptionally(ex -> {
log.error("[{}] Failed to skip all messages for subscription
{} on topic {}",
- clientAppId(), subName, topicName, ex);
+ clientAppId(), subName, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
- }
}
- private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse
asyncResponse,
+ private void
internalSkipAllMessagesForNonPartitionedTopicAsync(AsyncResponse asyncResponse,
String subName,
boolean authoritative) {
- try {
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
- PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
- BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
- if (ex != null) {
- asyncResponse.resume(new RestException(ex));
- log.error("[{}] Failed to skip all messages {} {}",
clientAppId(), topicName, subName, ex);
- } else {
- asyncResponse.resume(Response.noContent().build());
- log.info("[{}] Cleared backlog on {} {}", clientAppId(),
topicName, subName);
- }
- };
- if (subName.startsWith(topic.getReplicatorPrefix())) {
- String remoteCluster =
PersistentReplicator.getRemoteCluster(subName);
- PersistentReplicator repl = (PersistentReplicator)
topic.getPersistentReplicator(remoteCluster);
- if (repl == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
- return;
- }
- repl.clearBacklog().whenComplete(biConsumer);
- } else {
- PersistentSubscription sub = topic.getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
- return;
- }
- sub.clearBacklog().whenComplete(biConsumer);
- }
- } catch (WebApplicationException wae) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Failed to skip all messages for subscription
on topic {},"
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenRun(() ->
+ validateTopicOperationAsync(topicName, TopicOperation.SKIP,
subName))
+ .thenRun(() -> {
+ try {
+ PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
Review comment:
`getTopicReference` is blocking
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1651,118 +1651,123 @@ private void
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes
}
protected void internalSkipAllMessages(AsyncResponse asyncResponse, String
subName, boolean authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to skip all messages for subscription
{} on topic {}",
- clientAppId(), subName, topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
}
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- internalSkipAllMessagesForNonPartitionedTopic(asyncResponse,
subName, authoritative);
- } else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ future.thenRun(() -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenRun(() -> validateTopicOperationAsync(topicName,
TopicOperation.SKIP, subName))
+ .thenRun(() -> {
+ // If the topic name is a partition name, no need to get
partition topic metadata again
+ if (topicName.isPartitioned()) {
+
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName,
authoritative);
+ } else {
+ getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
- futures.add(pulsar()
- .getAdminClient()
- .topics()
-
.skipAllMessagesAsync(topicNamePartition.toString(),
+ for (int i = 0; i < partitionMetadata.partitions;
i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+ futures.add(pulsar()
+ .getAdminClient()
+ .topics()
+
.skipAllMessagesAsync(topicNamePartition.toString(),
subName));
- } catch (Exception e) {
- log.error("[{}] Failed to skip all messages {} {}",
- clientAppId(), topicNamePartition,
subName, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
+ } catch (Exception e) {
+ log.error("[{}] Failed to skip all
messages {} {}",
+ clientAppId(), topicNamePartition,
subName, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
- FutureUtil.waitForAll(futures).handle((result, exception)
-> {
- if (exception != null) {
- Throwable t = exception.getCause();
- if (t instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Subscription not found"));
- return null;
- } else {
- log.error("[{}] Failed to skip all messages {}
{}",
- clientAppId(), topicName, subName, t);
- asyncResponse.resume(new RestException(t));
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse.resume(
+ new
RestException(Status.NOT_FOUND, "Subscription not found"));
+ return null;
+ } else {
+ log.error("[{}] Failed to skip all
messages {} {}",
+ clientAppId(), topicName, subName,
t);
+ asyncResponse.resume(new
RestException(t));
+ return null;
+ }
+ }
+
+
asyncResponse.resume(Response.noContent().build());
return null;
- }
+ });
+ } else {
+
internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName,
authoritative);
}
-
- asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to skip all messages for
subscription {} on topic {}",
+ clientAppId(), subName, topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
- } else {
-
internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName,
authoritative);
}
}).exceptionally(ex -> {
log.error("[{}] Failed to skip all messages for subscription
{} on topic {}",
Review comment:
We should use `ex.getCause()`
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1651,118 +1651,123 @@ private void
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes
}
protected void internalSkipAllMessages(AsyncResponse asyncResponse, String
subName, boolean authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to skip all messages for subscription
{} on topic {}",
- clientAppId(), subName, topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
}
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.SKIP, subName);
-
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- internalSkipAllMessagesForNonPartitionedTopic(asyncResponse,
subName, authoritative);
- } else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ future.thenRun(() -> validateTopicOwnershipAsync(topicName,
authoritative))
Review comment:
Should use `thenCompose`, we need the result to continue the async work
flow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]