Technoboy- commented on a change in pull request #13880:
URL: https://github.com/apache/pulsar/pull/13880#discussion_r792472739
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -3317,131 +3335,184 @@ protected void
internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo
protected void internalExpireMessagesByTimestamp(AsyncResponse
asyncResponse, String subName,
int expireTimeInSeconds,
boolean authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (topicName.isPartitioned()) {
- try {
- internalExpireMessagesByTimestampForSinglePartition(subName,
expireTimeInSeconds, authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- return;
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- return;
- }
- asyncResponse.resume(Response.noContent().build());
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
- PartitionedTopicMetadata partitionMetadata =
getPartitionedTopicMetadata(topicName, authoritative, false);
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ future = CompletableFuture.completedFuture(null);
+ }
+ future.thenAccept(__ ->
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .thenAccept(partitionMetadata -> {
+ if (topicName.isPartitioned()) {
+
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
subName,
+ expireTimeInSeconds, authoritative)
+ .thenAccept(unused ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(e -> {
+ Throwable cause = e.getCause();
+ log.error("[{}] Failed to expire
messages up to {} on {}", clientAppId(),
+ expireTimeInSeconds,
topicName, cause);
+
resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ });
+ } else {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
- // expire messages for each partition topic
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicName topicNamePartition = topicName.getPartition(i);
- try {
- futures.add(pulsar()
- .getAdminClient()
- .topics()
-
.expireMessagesAsync(topicNamePartition.toString(),
- subName, expireTimeInSeconds));
- } catch (Exception e) {
- log.error("[{}] Failed to expire messages up to {} on
{}", clientAppId(), expireTimeInSeconds,
- topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
+ // expire messages for each partition topic
+ for (int i = 0; i <
partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+ futures.add(pulsar()
+ .getAdminClient()
+ .topics()
+
.expireMessagesAsync(topicNamePartition.toString(),
+ subName,
expireTimeInSeconds));
+ } catch (Exception e) {
+ log.error("[{}] Failed to expire
messages up to {} on {}", clientAppId(),
+ expireTimeInSeconds,
topicNamePartition, e);
+ asyncResponse.resume(new
RestException(e));
+ return;
+ }
+ }
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- Throwable t = exception.getCause();
- if (t instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Subscription not found"));
- return null;
- } else {
- log.error("[{}] Failed to expire messages up to {}
on {}",
- clientAppId(), expireTimeInSeconds,
- topicName, t);
- asyncResponse.resume(new RestException(t));
- return null;
+ FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND,
+ "Subscription not found"));
+ return null;
+ } else {
+ log.error("[{}] Failed to expire
messages up to {} on {}",
+ clientAppId(),
expireTimeInSeconds,
+ topicName, t);
+ asyncResponse.resume(new
RestException(t));
+ return null;
+ }
+ }
+
asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ } else {
+
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
subName,
+ expireTimeInSeconds, authoritative)
+ .thenAccept(unused ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(e -> {
+ Throwable cause = e.getCause();
+ log.error("[{}] Failed to expire
messages up to {} on {}", clientAppId(),
+ expireTimeInSeconds,
topicName, cause);
+
resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ });
+ }
}
- }
+ }).exceptionally(e -> {
+ Throwable cause = e.getCause();
+ log.error("[{}] Failed to expire messages up to {} on
{}", clientAppId(), expireTimeInSeconds,
+ topicName, cause);
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ })).exceptionally(e -> {
+ Throwable cause = e.getCause();
+ log.error("[{}] Failed to validate global namespace
ownership to expire messages up to {} on {}"
+ , clientAppId(), expireTimeInSeconds,
topicName, cause);
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ });
+ }
- asyncResponse.resume(Response.noContent().build());
- return null;
- });
+ // this method used in
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic,
+ private void internalExpireMessagesByTimestampForSinglePartition(String
subName, int expireTimeInSeconds,
+ boolean authoritative) {
+ PartitionedTopicMetadata partitionMetadata =
getPartitionedTopicMetadata(topicName, authoritative, false);
+ try {
+
internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
subName, expireTimeInSeconds,
+ authoritative).join();
+ } catch (CompletionException ce) {
+ if (ce.getCause() instanceof WebApplicationException) {
+ throw (WebApplicationException) ce.getCause();
} else {
- try {
-
internalExpireMessagesByTimestampForSinglePartition(subName,
expireTimeInSeconds, authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- return;
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- return;
- }
- asyncResponse.resume(Response.noContent().build());
+ throw new RestException(ce.getCause());
}
}
}
- private void internalExpireMessagesByTimestampForSinglePartition(String
subName, int expireTimeInSeconds,
+ private CompletableFuture<Void>
internalExpireMessagesByTimestampForSinglePartitionAsync(
+ PartitionedTopicMetadata partitionMetadata, String subName, int
expireTimeInSeconds,
boolean authoritative) {
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (!topicName.isPartitioned() &&
getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
+ if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) {
String msg = "This method should not be called for partitioned
topic";
- log.error("[{}] {} {} {}", clientAppId(), msg, topicName, subName);
- throw new IllegalStateException(msg);
- }
-
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
-
- if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
- log.error("[{}] Not supported operation of non-persistent topic {}
{}", clientAppId(), topicName, subName);
- throw new RestException(Status.METHOD_NOT_ALLOWED,
- "Expire messages on a non-persistent topic is not
allowed");
- }
-
- PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
- try {
- boolean issued;
- if (subName.startsWith(topic.getReplicatorPrefix())) {
- String remoteCluster =
PersistentReplicator.getRemoteCluster(subName);
- PersistentReplicator repl = (PersistentReplicator)
topic.getPersistentReplicator(remoteCluster);
- checkNotNull(repl);
- issued = repl.expireMessages(expireTimeInSeconds);
- } else {
- PersistentSubscription sub = topic.getSubscription(subName);
- checkNotNull(sub);
- issued = sub.expireMessages(expireTimeInSeconds);
- }
- if (issued) {
- log.info("[{}] Message expire started up to {} on {} {}",
clientAppId(), expireTimeInSeconds, topicName,
- subName);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Expire message by timestamp not issued on topic
{} for subscription {} due to ongoing "
- + "message expiration not finished or subscription
almost catch up. If it's performed on "
- + "a partitioned topic operation might succeeded
on other partitions, please check "
- + "stats of individual partition.", topicName,
subName);
+ return FutureUtil.failedFuture(new IllegalStateException(msg));
+ } else {
+ final CompletableFuture<Void> resultFuture = new
CompletableFuture<>();
+ try {
+ validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES)
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ .thenAccept(__ ->
getTopicReferenceAsync(topicName).thenAccept(t -> {
+ if (t == null) {
+ resultFuture.completeExceptionally(new
RestException(Status.NOT_FOUND, "Topic not found"));
+ return;
+ }
+ if (!(t instanceof PersistentTopic)) {
+ resultFuture.completeExceptionally(new
RestException(Status.METHOD_NOT_ALLOWED,
+ "Expire messages on a non-persistent topic
is not allowed"));
+ return;
+ }
+ PersistentTopic topic = (PersistentTopic) t;
+ try {
+ boolean issued;
+ if
(subName.startsWith(topic.getReplicatorPrefix())) {
+ String remoteCluster =
PersistentReplicator.getRemoteCluster(subName);
+ PersistentReplicator repl =
(PersistentReplicator) topic
+
.getPersistentReplicator(remoteCluster);
+ checkNotNull(repl);
+ issued =
repl.expireMessages(expireTimeInSeconds);
+ } else {
+ PersistentSubscription sub =
topic.getSubscription(subName);
+ checkNotNull(sub);
+ issued =
sub.expireMessages(expireTimeInSeconds);
+ }
+ if (issued) {
+ log.info("[{}] Message expire started up to {}
on {} {}", clientAppId(),
+ expireTimeInSeconds, topicName,
subName);
+ resultFuture.complete(__);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Expire message by timestamp not
issued on topic {} for subscription {} "
+ + "due to ongoing message
expiration not finished or subscription almost"
+ + " catch up. If it's performed on
a partitioned topic operation might "
+ + "succeeded on other partitions,
please check stats of individual "
+ + "partition.", topicName,
subName);
+ }
+ resultFuture.completeExceptionally(new
RestException(Status.CONFLICT, "Expire message "
+ + "by timestamp not issued on topic "
+ topicName + " for subscription "
+ + subName + " due to ongoing message
expiration not finished or subscription "
+ + "almost catch up. If it's performed
on a partitioned topic operation might"
+ + " succeeded on other partitions,
please check stats of individual partition."
+ ));
+ return;
+ }
+ } catch (NullPointerException npe) {
Review comment:
Why do we need to catch npe?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]