This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e0e15692361c892570ef4fad3dd6ea5b20caa20f
Author: lipenghui <[email protected]>
AuthorDate: Mon Feb 17 18:02:43 2020 +0800

    Avoid get partition metadata while the topic name is a partition name. 
(#6339)
    
    Motivation
    
    To avoid get partition metadata while the topic name is a partition name.
    Currently, if users want to skip all messages for a partitioned topic or 
unload a partitioned topic, the broker will call get topic metadata many times. 
For a topic with the partition name, it is not necessary to call get 
partitioned topic metadata again.
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 934 +++++++++++----------
 1 file changed, 493 insertions(+), 441 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4a1021f..b07f018 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -638,19 +638,15 @@ public class PersistentTopicsBase extends AdminResource {
                 zkSync(path);
                 log.info("[{}] Deleted partitioned topic {}", clientAppId(), 
topicName);
                 asyncResponse.resume(Response.noContent().build());
-                return;
             } catch (KeeperException.NoNodeException nne) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Partitioned topic does not exist"));
-                return;
             } catch (KeeperException.BadVersionException e) {
                 log.warn("[{}] Failed to delete partitioned topic {}: 
concurrent modification", clientAppId(),
                         topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, 
"Concurrent modification"));
-                return;
             } catch (Exception e) {
                 log.error("[{}] Failed to delete partitioned topic {}", 
clientAppId(), topicName, e);
                 asyncResponse.resume(new RestException(e));
-                return;
             }
         });
     }
@@ -660,57 +656,65 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-
-        getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).whenComplete((meta, t) -> {
-            if (meta.partitions > 0) {
-                final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
-
-                for (int i = 0; i < meta.partitions; i++) {
-                    TopicName topicNamePartition = topicName.getPartition(i);
-                    try {
-                        
futures.add(pulsar().getAdminClient().topics().unloadAsync(topicNamePartition.toString()));
-                    } catch (Exception e) {
-                        log.error("[{}] Failed to unload topic {}", 
clientAppId(), topicNamePartition, e);
-                        asyncResponse.resume(new RestException(e));
-                        return;
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {
+            internalUnloadNonPartitionedTopic(asyncResponse, authoritative);
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).whenComplete((meta, t) -> {
+                if (meta.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+
+                    for (int i = 0; i < meta.partitions; i++) {
+                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                        try {
+                            
futures.add(pulsar().getAdminClient().topics().unloadAsync(topicNamePartition.toString()));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to unload topic {}", 
clientAppId(), topicNamePartition, e);
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
                     }
-                }
 
-                FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                    if (exception != null) {
-                        Throwable th = exception.getCause();
-                        if (th instanceof NotFoundException) {
-                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                        } else {
-                            log.error("[{}] Failed to unload topic {}", 
clientAppId(), topicName, exception);
-                            asyncResponse.resume(new RestException(exception));
+                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
+                        if (exception != null) {
+                            Throwable th = exception.getCause();
+                            if (th instanceof NotFoundException) {
+                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
+                            } else {
+                                log.error("[{}] Failed to unload topic {}", 
clientAppId(), topicName, exception);
+                                asyncResponse.resume(new 
RestException(exception));
+                            }
+                            return null;
                         }
+
+                        asyncResponse.resume(Response.noContent().build());
                         return null;
-                    }
+                    });
+                } else {
+                    internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
+                }
+            }).exceptionally(t -> {
+                Throwable th = t.getCause();
+                asyncResponse.resume(new RestException(th));
+                return null;
+            });
+        }
+    }
 
-                    asyncResponse.resume(Response.noContent().build());
-                    return null;
-                });
-            } else {
-                validateAdminAccessForTenant(topicName.getTenant());
-                validateTopicOwnership(topicName, authoritative);
+    private void internalUnloadNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
+        validateAdminAccessForTenant(topicName.getTenant());
+        validateTopicOwnership(topicName, authoritative);
 
-                Topic topic = getTopicReference(topicName);
-                topic.close(false).whenComplete((r, ex) -> {
-                    if (ex != null) {
-                        log.error("[{}] Failed to unload topic {}, {}", 
clientAppId(), topicName, ex.getMessage(), ex);
-                        asyncResponse.resume(new RestException(ex));
+        Topic topic = getTopicReference(topicName);
+        topic.close(false).whenComplete((r, ex) -> {
+            if (ex != null) {
+                log.error("[{}] Failed to unload topic {}, {}", clientAppId(), 
topicName, ex.getMessage(), ex);
+                asyncResponse.resume(new RestException(ex));
 
-                    } else {
-                        log.info("[{}] Successfully unloaded topic {}", 
clientAppId(), topicName);
-                        asyncResponse.resume(Response.noContent().build());
-                    }
-                });
+            } else {
+                log.info("[{}] Successfully unloaded topic {}", clientAppId(), 
topicName);
+                asyncResponse.resume(Response.noContent().build());
             }
-        }).exceptionally(t -> {
-            Throwable th = t.getCause();
-            asyncResponse.resume(new RestException(th));
-            return null;
         });
     }
 
@@ -753,25 +757,26 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-
-        final List<String> subscriptions = Lists.newArrayList();
-
-        PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-        if (partitionMetadata.partitions > 0) {
-            try {
-                // get the subscriptions only from the 1st partition since all 
the other partitions will have the same
-                // subscriptions
-                
pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {
+            internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, 
authoritative);
+        } else {
+            PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
+            if (partitionMetadata.partitions > 0) {
+                try {
+                    // get the subscriptions only from the 1st partition since 
all the other partitions will have the same
+                    // subscriptions
+                    
pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
                         .whenComplete((r, ex) -> {
                             if (ex != null) {
                                 log.warn("[{}] Failed to get list of 
subscriptions for {}: {}", clientAppId(),
-                                        topicName, ex.getMessage());
+                                    topicName, ex.getMessage());
 
                                 if (ex instanceof PulsarAdminException) {
                                     PulsarAdminException pae = 
(PulsarAdminException) ex;
                                     if (pae.getStatusCode() == 
Status.NOT_FOUND.getStatusCode()) {
                                         asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
-                                                "Internal topics have not been 
generated yet"));
+                                            "Internal topics have not been 
generated yet"));
                                         return;
                                     } else {
                                         asyncResponse.resume(new 
RestException(pae));
@@ -782,29 +787,34 @@ public class PersistentTopicsBase extends AdminResource {
                                     return;
                                 }
                             }
-
+                            final List<String> subscriptions = 
Lists.newArrayList();
                             subscriptions.addAll(r);
                             asyncResponse.resume(subscriptions);
                             return;
                         });
-            } catch (Exception e) {
-                log.error("[{}] Failed to get list of subscriptions for {}", 
clientAppId(), topicName, e);
-                asyncResponse.resume(e);
-                return;
+                } catch (Exception e) {
+                    log.error("[{}] Failed to get list of subscriptions for 
{}", clientAppId(), topicName, e);
+                    asyncResponse.resume(e);
+                    return;
+                }
+            } else {
+                internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, 
authoritative);
             }
-        } else {
-            validateAdminOperationOnTopic(authoritative);
-            Topic topic = getTopicReference(topicName);
+        }
+    }
 
-            try {
-                topic.getSubscriptions().forEach((subName, sub) -> 
subscriptions.add(subName));
-                asyncResponse.resume(subscriptions);
-                return;
-            } catch (Exception e) {
-                log.error("[{}] Failed to get list of subscriptions for {}", 
clientAppId(), topicName, e);
-                asyncResponse.resume(new RestException(e));
-                return;
-            }
+    private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
+        validateAdminOperationOnTopic(authoritative);
+        Topic topic = getTopicReference(topicName);
+        try {
+            final List<String> subscriptions = Lists.newArrayList();
+            topic.getSubscriptions().forEach((subName, sub) -> 
subscriptions.add(subName));
+            asyncResponse.resume(subscriptions);
+            return;
+        } catch (Exception e) {
+            log.error("[{}] Failed to get list of subscriptions for {}", 
clientAppId(), topicName, e);
+            asyncResponse.resume(new RestException(e));
+            return;
         }
     }
 
@@ -952,67 +962,72 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-        if (partitionMetadata.partitions > 0) {
-            final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {
+            internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, 
subName, authoritative);
+        } else {
+            PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
+            if (partitionMetadata.partitions > 0) {
+                final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
 
-            for (int i = 0; i < partitionMetadata.partitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                try {
-                    futures.add(pulsar().getAdminClient().topics()
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    TopicName topicNamePartition = topicName.getPartition(i);
+                    try {
+                        futures.add(pulsar().getAdminClient().topics()
                             
.deleteSubscriptionAsync(topicNamePartition.toString(), subName));
-                } catch (Exception e) {
-                    log.error("[{}] Failed to delete subscription {} {}", 
clientAppId(), topicNamePartition, subName,
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to delete subscription {} {}", 
clientAppId(), topicNamePartition, subName,
                             e);
-                    asyncResponse.resume(new RestException(e));
-                    return;
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
                 }
-            }
 
-            FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                if (exception != null) {
-                    Throwable t = exception.getCause();
-                    if (t instanceof NotFoundException) {
-                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
-                        return null;
-                    } else if (t instanceof PreconditionFailedException) {
-                        asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                    if (exception != null) {
+                        Throwable t = exception.getCause();
+                        if (t instanceof NotFoundException) {
+                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
+                            return null;
+                        } else if (t instanceof PreconditionFailedException) {
+                            asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
                                 "Subscription has active connected 
consumers"));
-                        return null;
-                    } else {
-                        log.error("[{}] Failed to delete subscription {} {}", 
clientAppId(), topicName, subName, t);
-                        asyncResponse.resume(new RestException(t));
-                        return null;
+                            return null;
+                        } else {
+                            log.error("[{}] Failed to delete subscription {} 
{}", clientAppId(), topicName, subName, t);
+                            asyncResponse.resume(new RestException(t));
+                            return null;
+                        }
                     }
-                }
 
-                asyncResponse.resume(Response.noContent().build());
-                return null;
-            });
-        } else {
-            validateAdminAccessForSubscriber(subName, authoritative);
-            Topic topic = getTopicReference(topicName);
-            try {
-                Subscription sub = topic.getSubscription(subName);
-                checkNotNull(sub);
-                sub.delete().get();
-                log.info("[{}][{}] Deleted subscription {}", clientAppId(), 
topicName, subName);
-                asyncResponse.resume(Response.noContent().build());
-                return;
-            } catch (Exception e) {
-                Throwable t = e.getCause();
-                if (e instanceof NullPointerException) {
-                    asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                    return;
-                } else if (t instanceof SubscriptionBusyException) {
-                    asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                            "Subscription has active connected consumers"));
-                    return;
-                } else {
-                    log.error("[{}] Failed to delete subscription {} {}", 
clientAppId(), topicName, subName, e);
-                    asyncResponse.resume(new RestException(t));
-                    return;
-                }
+                    asyncResponse.resume(Response.noContent().build());
+                    return null;
+                });
+            } else {
+                
internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, 
authoritative);
+            }
+        }
+    }
+
+    private void 
internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, 
String subName, boolean authoritative) {
+        validateAdminAccessForSubscriber(subName, authoritative);
+        Topic topic = getTopicReference(topicName);
+        try {
+            Subscription sub = topic.getSubscription(subName);
+            checkNotNull(sub);
+            sub.delete().get();
+            log.info("[{}][{}] Deleted subscription {}", clientAppId(), 
topicName, subName);
+            asyncResponse.resume(Response.noContent().build());
+        } catch (Exception e) {
+            Throwable t = e.getCause();
+            if (e instanceof NullPointerException) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
+            } else if (t instanceof SubscriptionBusyException) {
+                asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Subscription has active connected consumers"));
+            } else {
+                log.error("[{}] Failed to delete subscription {} {}", 
clientAppId(), topicName, subName, e);
+                asyncResponse.resume(new RestException(t));
             }
         }
     }
@@ -1021,67 +1036,76 @@ public class PersistentTopicsBase extends AdminResource 
{
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-        if (partitionMetadata.partitions > 0) {
-            final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {
+            internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, 
subName, authoritative);
+        } else {
+            PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
+            if (partitionMetadata.partitions > 0) {
+                final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
 
-            for (int i = 0; i < partitionMetadata.partitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                try {
-                    
futures.add(pulsar().getAdminClient().topics().skipAllMessagesAsync(topicNamePartition.toString(),
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    TopicName topicNamePartition = topicName.getPartition(i);
+                    try {
+                        
futures.add(pulsar().getAdminClient().topics().skipAllMessagesAsync(topicNamePartition.toString(),
                             subName));
-                } catch (Exception e) {
-                    log.error("[{}] Failed to skip all messages {} {}", 
clientAppId(), topicNamePartition, subName, e);
-                    asyncResponse.resume(new RestException(e));
-                    return;
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to skip all messages {} {}", 
clientAppId(), topicNamePartition, subName, e);
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
                 }
-            }
 
-            FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                if (exception != null) {
-                    Throwable t = exception.getCause();
-                    if (t instanceof NotFoundException) {
-                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
-                        return null;
-                    } else {
-                        log.error("[{}] Failed to skip all messages {} {}", 
clientAppId(), topicName, subName, t);
-                        asyncResponse.resume(new RestException(t));
-                        return null;
+                FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                    if (exception != null) {
+                        Throwable t = exception.getCause();
+                        if (t instanceof NotFoundException) {
+                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
+                            return null;
+                        } else {
+                            log.error("[{}] Failed to skip all messages {} 
{}", clientAppId(), topicName, subName, t);
+                            asyncResponse.resume(new RestException(t));
+                            return null;
+                        }
                     }
-                }
 
-                asyncResponse.resume(Response.noContent().build());
-                return null;
-            });
-        } else {
-            validateAdminAccessForSubscriber(subName, authoritative);
-            PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
-            BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
-                if (ex != null) {
-                    asyncResponse.resume(new RestException(ex));
-                    log.error("[{}] Failed to skip all messages {} {}", 
clientAppId(), topicName, subName, ex);
-                } else {
                     asyncResponse.resume(Response.noContent().build());
-                    log.info("[{}] Cleared backlog on {} {}", clientAppId(), 
topicName, subName);
-                }
-            };
-            try {
-                if (subName.startsWith(topic.getReplicatorPrefix())) {
-                    String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
-                    PersistentReplicator repl = (PersistentReplicator) 
topic.getPersistentReplicator(remoteCluster);
-                    checkNotNull(repl);
-                    repl.clearBacklog().whenComplete(biConsumer);
-                } else {
-                    PersistentSubscription sub = 
topic.getSubscription(subName);
-                    checkNotNull(sub);
-                    sub.clearBacklog().whenComplete(biConsumer);
-                }
-            } catch (Exception e) {
-                if (e instanceof NullPointerException) {
-                    asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                } else {
-                    asyncResponse.resume(new RestException(e));
-                }
+                    return null;
+                });
+            } else {
+                internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, 
subName, authoritative);
+            }
+        }
+    }
+
+    private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse 
asyncResponse, String subName, boolean authoritative) {
+        validateAdminAccessForSubscriber(subName, authoritative);
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
+            if (ex != null) {
+                asyncResponse.resume(new RestException(ex));
+                log.error("[{}] Failed to skip all messages {} {}", 
clientAppId(), topicName, subName, ex);
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+                log.info("[{}] Cleared backlog on {} {}", clientAppId(), 
topicName, subName);
+            }
+        };
+        try {
+            if (subName.startsWith(topic.getReplicatorPrefix())) {
+                String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
+                PersistentReplicator repl = (PersistentReplicator) 
topic.getPersistentReplicator(remoteCluster);
+                checkNotNull(repl);
+                repl.clearBacklog().whenComplete(biConsumer);
+            } else {
+                PersistentSubscription sub = topic.getSubscription(subName);
+                checkNotNull(sub);
+                sub.clearBacklog().whenComplete(biConsumer);
+            }
+        } catch (Exception e) {
+            if (e instanceof NullPointerException) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
+            } else {
+                asyncResponse.resume(new RestException(e));
             }
         }
     }
@@ -1122,73 +1146,82 @@ public class PersistentTopicsBase extends AdminResource 
{
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-        if (partitionMetadata.partitions > 0) {
-            final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {
+            
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, 
expireTimeInSeconds, authoritative);
+        } else {
+            PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
+            if (partitionMetadata.partitions > 0) {
+                final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
 
-            // expire messages for each partition topic
-            for (int i = 0; i < partitionMetadata.partitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                try {
-                    
futures.add(pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync(
+                // expire messages for each partition topic
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    TopicName topicNamePartition = topicName.getPartition(i);
+                    try {
+                        
futures.add(pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync(
                             topicNamePartition.toString(), 
expireTimeInSeconds));
-                } catch (Exception e) {
-                    log.error("[{}] Failed to expire messages up to {} on {}", 
clientAppId(), expireTimeInSeconds,
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to expire messages up to {} on 
{}", clientAppId(), expireTimeInSeconds,
                             topicNamePartition, e);
-                    asyncResponse.resume(new RestException(e));
-                    return;
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
                 }
-            }
 
-            FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                if (exception != null) {
-                    Throwable t = exception.getCause();
-                    log.error("[{}] Failed to expire messages up to {} on {}", 
clientAppId(), expireTimeInSeconds,
+                FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                    if (exception != null) {
+                        Throwable t = exception.getCause();
+                        log.error("[{}] Failed to expire messages up to {} on 
{}", clientAppId(), expireTimeInSeconds,
                             topicName, t);
-                    asyncResponse.resume(new RestException(t));
-                    return null;
-                }
+                        asyncResponse.resume(new RestException(t));
+                        return null;
+                    }
 
-                asyncResponse.resume(Response.noContent().build());
-                return null;
-            });
-        } else {
-            // validate ownership and redirect if current broker is not owner
-            validateAdminOperationOnTopic(authoritative);
+                    asyncResponse.resume(Response.noContent().build());
+                    return null;
+                });
+            } else {
+                
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, 
expireTimeInSeconds, authoritative);
+            }
+        }
+    }
 
-            PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
-            final AtomicReference<Throwable> exception = new 
AtomicReference<>();
+    private void 
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse 
asyncResponse, int expireTimeInSeconds,
+            boolean authoritative) {
+        // validate ownership and redirect if current broker is not owner
+        validateAdminOperationOnTopic(authoritative);
 
-            topic.getReplicators().forEach((subName, replicator) -> {
-                try {
-                    internalExpireMessagesForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-                } catch (Throwable t) {
-                    exception.set(t);
-                }
-            });
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        final AtomicReference<Throwable> exception = new AtomicReference<>();
 
-            topic.getSubscriptions().forEach((subName, subscriber) -> {
-                try {
-                    internalExpireMessagesForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
-                } catch (Throwable t) {
-                    exception.set(t);
-                }
-            });
+        topic.getReplicators().forEach((subName, replicator) -> {
+            try {
+                internalExpireMessagesForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
+            } catch (Throwable t) {
+                exception.set(t);
+            }
+        });
 
-            if (exception.get() != null) {
-                if (exception.get() instanceof WebApplicationException) {
-                    WebApplicationException wae = (WebApplicationException) 
exception.get();
-                    asyncResponse.resume(wae);
-                    return;
-                } else {
-                    asyncResponse.resume(new RestException(exception.get()));
-                    return;
-                }
+        topic.getSubscriptions().forEach((subName, subscriber) -> {
+            try {
+                internalExpireMessagesForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
+            } catch (Throwable t) {
+                exception.set(t);
             }
+        });
 
-            asyncResponse.resume(Response.noContent().build());
-            return;
+        if (exception.get() != null) {
+            if (exception.get() instanceof WebApplicationException) {
+                WebApplicationException wae = (WebApplicationException) 
exception.get();
+                asyncResponse.resume(wae);
+                return;
+            } else {
+                asyncResponse.resume(new RestException(exception.get()));
+                return;
+            }
         }
+
+        asyncResponse.resume(Response.noContent().build());
     }
 
     protected void internalResetCursor(AsyncResponse asyncResponse, String 
subName, long timestamp,
@@ -1196,108 +1229,111 @@ public class PersistentTopicsBase extends 
AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-
-        PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-        final int numPartitions = partitionMetadata.partitions;
-        if (numPartitions > 0) {
-            final CompletableFuture<Void> future = new CompletableFuture<>();
-            final AtomicInteger count = new AtomicInteger(numPartitions);
-            final AtomicInteger failureCount = new AtomicInteger(0);
-            final AtomicReference<Throwable> partitionException = new 
AtomicReference<>();
-
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                try {
-                    pulsar().getAdminClient().topics()
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {
+            internalResetCursorForNonPartitionedTopic(asyncResponse, subName, 
timestamp, authoritative);
+        } else {
+            PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
+            final int numPartitions = partitionMetadata.partitions;
+            if (numPartitions > 0) {
+                final CompletableFuture<Void> future = new 
CompletableFuture<>();
+                final AtomicInteger count = new AtomicInteger(numPartitions);
+                final AtomicInteger failureCount = new AtomicInteger(0);
+                final AtomicReference<Throwable> partitionException = new 
AtomicReference<>();
+
+                for (int i = 0; i < numPartitions; i++) {
+                    TopicName topicNamePartition = topicName.getPartition(i);
+                    try {
+                        pulsar().getAdminClient().topics()
                             .resetCursorAsync(topicNamePartition.toString(), 
subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof 
PreconditionFailedException) {
-                                        // throw the last exception if all 
partitions get this error
-                                        // any other exception on partition is 
reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset 
cursor on subscription {} to time {}",
-                                                clientAppId(), 
topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
-                                    }
+                            if (ex != null) {
+                                if (ex instanceof PreconditionFailedException) 
{
+                                    // throw the last exception if all 
partitions get this error
+                                    // any other exception on partition is 
reported back to user
+                                    failureCount.incrementAndGet();
+                                    partitionException.set(ex);
+                                } else {
+                                    log.warn("[{}] [{}] Failed to reset cursor 
on subscription {} to time {}",
+                                        clientAppId(), topicNamePartition, 
subName, timestamp, ex);
+                                    future.completeExceptionally(ex);
+                                    return null;
                                 }
+                            }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                            if (count.decrementAndGet() == 0) {
+                                future.complete(null);
+                            }
 
-                                return null;
-                            });
-                } catch (Exception e) {
-                    log.warn("[{}] [{}] Failed to reset cursor on subscription 
{} to time {}", clientAppId(),
+                            return null;
+                        });
+                    } catch (Exception e) {
+                        log.warn("[{}] [{}] Failed to reset cursor on 
subscription {} to time {}", clientAppId(),
                             topicNamePartition, subName, timestamp, e);
-                    future.completeExceptionally(e);
+                        future.completeExceptionally(e);
+                    }
                 }
-            }
 
-            future.whenComplete((r, ex) -> {
-                if (ex != null) {
-                    if (ex instanceof PulsarAdminException) {
-                        asyncResponse.resume(new 
RestException((PulsarAdminException) ex));
-                        return;
-                    } else {
-                        asyncResponse.resume(new RestException(ex));
-                        return;
+                future.whenComplete((r, ex) -> {
+                    if (ex != null) {
+                        if (ex instanceof PulsarAdminException) {
+                            asyncResponse.resume(new 
RestException((PulsarAdminException) ex));
+                            return;
+                        } else {
+                            asyncResponse.resume(new RestException(ex));
+                            return;
+                        }
                     }
-                }
 
-                // report an error to user if unable to reset for all 
partitions
-                if (failureCount.get() == numPartitions) {
-                    log.warn("[{}] [{}] Failed to reset cursor on subscription 
{} to time {}", clientAppId(), topicName,
+                    // report an error to user if unable to reset for all 
partitions
+                    if (failureCount.get() == numPartitions) {
+                        log.warn("[{}] [{}] Failed to reset cursor on 
subscription {} to time {}", clientAppId(), topicName,
                             subName, timestamp, partitionException.get());
-                    asyncResponse.resume(
+                        asyncResponse.resume(
                             new RestException(Status.PRECONDITION_FAILED, 
partitionException.get().getMessage()));
-                    return;
-                } else if (failureCount.get() > 0) {
-                    log.warn("[{}] [{}] Partial errors for reset cursor on 
subscription {} to time {}", clientAppId(),
+                        return;
+                    } else if (failureCount.get() > 0) {
+                        log.warn("[{}] [{}] Partial errors for reset cursor on 
subscription {} to time {}", clientAppId(),
                             topicName, subName, timestamp, 
partitionException.get());
-                }
+                    }
 
-                asyncResponse.resume(Response.noContent().build());
-                return;
-            });
-        } else {
-            validateAdminAccessForSubscriber(subName, authoritative);
-            log.info("[{}] [{}] Received reset cursor on subscription {} to 
time {}", clientAppId(), topicName, subName,
-                    timestamp);
-            PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Topic not found"));
-                return;
+                    asyncResponse.resume(Response.noContent().build());
+                });
+            } else {
+                internalResetCursorForNonPartitionedTopic(asyncResponse, 
subName, timestamp, authoritative);
             }
-            try {
-                PersistentSubscription sub = topic.getSubscription(subName);
-                checkNotNull(sub);
-                sub.resetCursor(timestamp).get();
-                log.info("[{}] [{}] Reset cursor on subscription {} to time 
{}", clientAppId(), topicName, subName,
-                        timestamp);
-                asyncResponse.resume(Response.noContent().build());
-                return;
-            } catch (Exception e) {
-                Throwable t = e.getCause();
-                log.warn("[{}] [{}] Failed to reset cursor on subscription {} 
to time {}", clientAppId(), topicName,
-                        subName, timestamp, e);
-                if (e instanceof NullPointerException) {
-                    asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
-                    return;
-                } else if (e instanceof NotAllowedException) {
-                    asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));
-                    return;
-                } else if (t instanceof SubscriptionInvalidCursorPosition) {
-                    asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                            "Unable to find position for timestamp specified 
-" + t.getMessage()));
-                    return;
-                } else {
-                    asyncResponse.resume(new RestException(e));
-                    return;
-                }
+        }
+    }
+
+    private void internalResetCursorForNonPartitionedTopic(AsyncResponse 
asyncResponse, String subName, long timestamp,
+                                       boolean authoritative) {
+        validateAdminAccessForSubscriber(subName, authoritative);
+        log.info("[{}] [{}] Received reset cursor on subscription {} to time 
{}", clientAppId(), topicName, subName,
+            timestamp);
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        if (topic == null) {
+            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic 
not found"));
+            return;
+        }
+        try {
+            PersistentSubscription sub = topic.getSubscription(subName);
+            checkNotNull(sub);
+            sub.resetCursor(timestamp).get();
+            log.info("[{}] [{}] Reset cursor on subscription {} to time {}", 
clientAppId(), topicName, subName,
+                timestamp);
+            asyncResponse.resume(Response.noContent().build());
+        } catch (Exception e) {
+            Throwable t = e.getCause();
+            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to 
time {}", clientAppId(), topicName,
+                subName, timestamp, e);
+            if (e instanceof NullPointerException) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
+            } else if (e instanceof NotAllowedException) {
+                asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));
+            } else if (t instanceof SubscriptionInvalidCursorPosition) {
+                asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Unable to find position for timestamp specified -" + 
t.getMessage()));
+            } else {
+                asyncResponse.resume(new RestException(e));
             }
         }
     }
@@ -1310,27 +1346,30 @@ public class PersistentTopicsBase extends AdminResource 
{
         final MessageIdImpl targetMessageId = messageId == null ? 
(MessageIdImpl) MessageId.earliest : messageId;
         log.info("[{}][{}] Creating subscription {} at message id {}", 
clientAppId(), topicName, subscriptionName,
                 targetMessageId);
-
-        PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-        final int numPartitions = partitionMetadata.partitions;
-        if (numPartitions > 0) {
-            final CompletableFuture<Void> future = new CompletableFuture<>();
-            final AtomicInteger count = new AtomicInteger(numPartitions);
-            final AtomicInteger failureCount = new AtomicInteger(0);
-            final AtomicReference<Throwable> partitionException = new 
AtomicReference<>();
-
-            // Create the subscription on each partition
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                try {
-                    pulsar().getAdminClient().topics()
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {
+            internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, 
subscriptionName, targetMessageId, authoritative, replicated);
+        } else {
+            PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
+            final int numPartitions = partitionMetadata.partitions;
+            if (numPartitions > 0) {
+                final CompletableFuture<Void> future = new 
CompletableFuture<>();
+                final AtomicInteger count = new AtomicInteger(numPartitions);
+                final AtomicInteger failureCount = new AtomicInteger(0);
+                final AtomicReference<Throwable> partitionException = new 
AtomicReference<>();
+
+                // Create the subscription on each partition
+                for (int i = 0; i < numPartitions; i++) {
+                    TopicName topicNamePartition = topicName.getPartition(i);
+                    try {
+                        pulsar().getAdminClient().topics()
                             
.createSubscriptionAsync(topicNamePartition.toString(), subscriptionName, 
targetMessageId)
                             .handle((r, ex) -> {
                                 if (ex != null) {
                                     // fail the operation on unknown exception 
or if all the partitioned failed due to
                                     // subscription-already-exist
                                     if (failureCount.incrementAndGet() == 
numPartitions
-                                            || !(ex instanceof 
PulsarAdminException.ConflictException)) {
+                                        || !(ex instanceof 
PulsarAdminException.ConflictException)) {
                                         partitionException.set(ex);
                                     }
                                 }
@@ -1341,75 +1380,79 @@ public class PersistentTopicsBase extends AdminResource 
{
 
                                 return null;
                             });
-                } catch (Exception e) {
-                    log.warn("[{}] [{}] Failed to create subscription {} at 
message id {}", clientAppId(),
+                    } catch (Exception e) {
+                        log.warn("[{}] [{}] Failed to create subscription {} 
at message id {}", clientAppId(),
                             topicNamePartition, subscriptionName, 
targetMessageId, e);
-                    future.completeExceptionally(e);
+                        future.completeExceptionally(e);
+                    }
                 }
-            }
 
-            future.whenComplete((r, ex) -> {
-                if (ex != null) {
-                    if (ex instanceof PulsarAdminException) {
-                        asyncResponse.resume(new 
RestException((PulsarAdminException) ex));
-                        return;
-                    } else {
-                        asyncResponse.resume(new RestException(ex));
-                        return;
+                future.whenComplete((r, ex) -> {
+                    if (ex != null) {
+                        if (ex instanceof PulsarAdminException) {
+                            asyncResponse.resume(new 
RestException((PulsarAdminException) ex));
+                            return;
+                        } else {
+                            asyncResponse.resume(new RestException(ex));
+                            return;
+                        }
                     }
-                }
 
-                if (partitionException.get() != null) {
-                    log.warn("[{}] [{}] Failed to create subscription {} at 
message id {}", clientAppId(), topicName,
+                    if (partitionException.get() != null) {
+                        log.warn("[{}] [{}] Failed to create subscription {} 
at message id {}", clientAppId(), topicName,
                             subscriptionName, targetMessageId, 
partitionException.get());
-                    if (partitionException.get() instanceof 
PulsarAdminException) {
-                        asyncResponse.resume(new 
RestException((PulsarAdminException) partitionException.get()));
-                        return;
-                    } else {
-                        asyncResponse.resume(new 
RestException(partitionException.get()));
-                        return;
+                        if (partitionException.get() instanceof 
PulsarAdminException) {
+                            asyncResponse.resume(new 
RestException((PulsarAdminException) partitionException.get()));
+                            return;
+                        } else {
+                            asyncResponse.resume(new 
RestException(partitionException.get()));
+                            return;
+                        }
                     }
-                }
-
-                asyncResponse.resume(Response.noContent().build());
-                return;
-            });
-        } else {
-            validateAdminAccessForSubscriber(subscriptionName, authoritative);
-
-            PersistentTopic topic = (PersistentTopic) 
getOrCreateTopic(topicName);
 
-            if (topic.getSubscriptions().containsKey(subscriptionName)) {
-                asyncResponse.resume(new RestException(Status.CONFLICT, 
"Subscription already exists for topic"));
-                return;
+                    asyncResponse.resume(Response.noContent().build());
+                });
+            } else {
+                
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, 
subscriptionName, targetMessageId, authoritative, replicated);
             }
+        }
+    }
 
-            try {
-                PersistentSubscription subscription = (PersistentSubscription) 
topic
-                        .createSubscription(subscriptionName, 
InitialPosition.Latest, replicated).get();
-                // Mark the cursor as "inactive" as it was created without a 
real consumer connected
-                subscription.deactivateCursor();
-                
subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), 
targetMessageId.getEntryId()))
-                        .get();
-            } catch (Throwable e) {
-                Throwable t = e.getCause();
-                log.warn("[{}] [{}] Failed to create subscription {} at 
message id {}", clientAppId(), topicName,
-                        subscriptionName, targetMessageId, e);
-                if (t instanceof SubscriptionInvalidCursorPosition) {
-                    asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                            "Unable to find position for position specified: " 
+ t.getMessage()));
-                    return;
-                } else {
-                    asyncResponse.resume(new RestException(e));
-                    return;
-                }
-            }
+    private void 
internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, 
String subscriptionName,
+              MessageIdImpl targetMessageId, boolean authoritative, boolean 
replicated) {
+        validateAdminAccessForSubscriber(subscriptionName, authoritative);
 
-            log.info("[{}][{}] Successfully created subscription {} at message 
id {}", clientAppId(), topicName,
-                    subscriptionName, targetMessageId);
-            asyncResponse.resume(Response.noContent().build());
+        PersistentTopic topic = (PersistentTopic) getOrCreateTopic(topicName);
+
+        if (topic.getSubscriptions().containsKey(subscriptionName)) {
+            asyncResponse.resume(new RestException(Status.CONFLICT, 
"Subscription already exists for topic"));
             return;
         }
+
+        try {
+            PersistentSubscription subscription = (PersistentSubscription) 
topic
+                .createSubscription(subscriptionName, InitialPosition.Latest, 
replicated).get();
+            // Mark the cursor as "inactive" as it was created without a real 
consumer connected
+            subscription.deactivateCursor();
+            
subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), 
targetMessageId.getEntryId()))
+                .get();
+        } catch (Throwable e) {
+            Throwable t = e.getCause();
+            log.warn("[{}] [{}] Failed to create subscription {} at message id 
{}", clientAppId(), topicName,
+                subscriptionName, targetMessageId, e);
+            if (t instanceof SubscriptionInvalidCursorPosition) {
+                asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Unable to find position for position specified: " + 
t.getMessage()));
+                return;
+            } else {
+                asyncResponse.resume(new RestException(e));
+                return;
+            }
+        }
+
+        log.info("[{}][{}] Successfully created subscription {} at message id 
{}", clientAppId(), topicName,
+            subscriptionName, targetMessageId);
+        asyncResponse.resume(Response.noContent().build());
     }
 
     protected void internalResetCursorOnPosition(String subName, boolean 
authoritative, MessageIdImpl messageId) {
@@ -1418,10 +1461,8 @@ public class PersistentTopicsBase extends AdminResource {
         }
         log.info("[{}][{}] received reset cursor on subscription {} to 
position {}", clientAppId(), topicName,
                 subName, messageId);
-
-        PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-
-        if (partitionMetadata.partitions > 0) {
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (!topicName.isPartitioned() && 
getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
             log.warn("[{}] Not supported operation on partitioned-topic {} 
{}", clientAppId(), topicName,
                     subName);
             throw new RestException(Status.METHOD_NOT_ALLOWED,
@@ -1458,8 +1499,8 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-        if (partitionMetadata.partitions > 0) {
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (!topicName.isPartitioned() && 
getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages 
on a partitioned topic is not allowed");
         }
         validateAdminAccessForSubscriber(subName, authoritative);
@@ -1602,43 +1643,8 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-
-        PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-        if (partitionMetadata.partitions > 0) {
-            final List<CompletableFuture<Void>> futures = Lists.newArrayList();
-
-            // expire messages for each partition topic
-            for (int i = 0; i < partitionMetadata.partitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                try {
-                    
futures.add(pulsar().getAdminClient().topics().expireMessagesAsync(topicNamePartition.toString(),
-                            subName, expireTimeInSeconds));
-                } catch (Exception e) {
-                    log.error("[{}] Failed to expire messages up to {} on {}", 
clientAppId(), expireTimeInSeconds,
-                            topicNamePartition, e);
-                    asyncResponse.resume(new RestException(e));
-                    return;
-                }
-            }
-
-            FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                if (exception != null) {
-                    Throwable t = exception.getCause();
-                    if (t instanceof NotFoundException) {
-                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
-                        return null;
-                    } else {
-                        log.error("[{}] Failed to expire messages up to {} on 
{}", clientAppId(), expireTimeInSeconds,
-                                topicName, t);
-                        asyncResponse.resume(new RestException(t));
-                        return null;
-                    }
-                }
-
-                asyncResponse.resume(Response.noContent().build());
-                return null;
-            });
-        } else {
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {
             try {
                 internalExpireMessagesForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
             } catch (WebApplicationException wae) {
@@ -1649,7 +1655,54 @@ public class PersistentTopicsBase extends AdminResource {
                 return;
             }
             asyncResponse.resume(Response.noContent().build());
-            return;
+        } else {
+            PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
+            if (partitionMetadata.partitions > 0) {
+                final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+
+                // expire messages for each partition topic
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    TopicName topicNamePartition = topicName.getPartition(i);
+                    try {
+                        
futures.add(pulsar().getAdminClient().topics().expireMessagesAsync(topicNamePartition.toString(),
+                            subName, expireTimeInSeconds));
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to expire messages up to {} on 
{}", clientAppId(), expireTimeInSeconds,
+                            topicNamePartition, e);
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+                }
+
+                FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                    if (exception != null) {
+                        Throwable t = exception.getCause();
+                        if (t instanceof NotFoundException) {
+                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Subscription not found"));
+                            return null;
+                        } else {
+                            log.error("[{}] Failed to expire messages up to {} 
on {}", clientAppId(), expireTimeInSeconds,
+                                topicName, t);
+                            asyncResponse.resume(new RestException(t));
+                            return null;
+                        }
+                    }
+
+                    asyncResponse.resume(Response.noContent().build());
+                    return null;
+                });
+            } else {
+                try {
+                    internalExpireMessagesForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
+                } catch (WebApplicationException wae) {
+                    asyncResponse.resume(wae);
+                    return;
+                } catch (Exception e) {
+                    asyncResponse.resume(new RestException(e));
+                    return;
+                }
+                asyncResponse.resume(Response.noContent().build());
+            }
         }
     }
 
@@ -1658,9 +1711,8 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-
-        PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-        if (partitionMetadata.partitions > 0) {
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (!topicName.isPartitioned() && 
getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
             String msg = "This method should not be called for partitioned 
topic";
             log.error("[{}] {} {} {}", clientAppId(), msg, topicName, subName);
             throw new IllegalStateException(msg);

Reply via email to