This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 80ad05f  Fix create consumer on partitioned topic while disable topic 
auto creation. (#5572)
80ad05f is described below

commit 80ad05f795f0457b7213caad23c89cbdae50627d
Author: lipenghui <[email protected]>
AuthorDate: Thu Jan 9 10:40:15 2020 +0800

    Fix create consumer on partitioned topic while disable topic auto creation. 
(#5572)
    
    ### Motivation
    
    Currently, disable the topic auto creation will cause consumer create 
failed on a partitioned topic. Since the partitioned topic is already created, 
so we should handle the topic partition create when disable the topic auto 
creation.
    
    ### Modifications
    
    By default, create partitioned topics also try to create all partitions, 
and if create partitions failed, users can use `create-missed-partitions` to 
repair.
    
    If users already have a partitioned topic without created partitions, can 
also use `create-missed-partitions` to repair.
    (cherry picked from commit 602f1c207c220eb53defcedbd03e8732797e1e26)
---
 .../java/org/apache/zookeeper/MockZooKeeper.java   |  22 ++--
 .../apache/pulsar/broker/admin/AdminResource.java  |  55 +++++++-
 .../apache/pulsar/broker/admin/ZkAdminPaths.java   |   4 +
 .../broker/admin/impl/PersistentTopicsBase.java    |  51 +++++---
 .../broker/admin/v1/NonPersistentTopics.java       |   4 +-
 .../broker/admin/v2/NonPersistentTopics.java       |   4 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  26 +++-
 .../pulsar/broker/service/BrokerService.java       |  15 ++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  14 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  20 +--
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |   6 +-
 .../pulsar/client/api/PartitionCreationTest.java   | 144 +++++++++++++++++++++
 .../client/impl/PatternTopicsConsumerImplTest.java |   8 +-
 .../org/apache/pulsar/client/admin/Topics.java     |  20 +++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  21 +++
 .../pulsar/client/api/PulsarClientException.java   |   2 +
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  16 +++
 site2/docs/admin-api-partitioned-topics.md         |  28 ++++
 site2/docs/reference-pulsar-admin.md               |  10 ++
 19 files changed, 398 insertions(+), 72 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java 
b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
index c417604..f4160ed 100644
--- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
@@ -425,10 +425,13 @@ public class MockZooKeeper extends ZooKeeper {
                     if (path.length() >= item.length()) {
                         continue;
                     }
-
-                    String child = item.substring(path.length() + 1);
-                    if (!child.contains("/")) {
-                        children.add(child);
+                    String child = item.substring(path.length());
+                    if (child.indexOf("/") == 0) {
+                        child = child.substring(1);
+                        log.debug("child: '{}'", child);
+                        if (!child.contains("/")) {
+                            children.add(child);
+                        }
                     }
                 }
             }
@@ -465,10 +468,13 @@ public class MockZooKeeper extends ZooKeeper {
                 } else if (item.equals(path)) {
                     continue;
                 } else {
-                    String child = item.substring(path.length() + 1);
-                    log.debug("child: '{}'", child);
-                    if (!child.contains("/")) {
-                        children.add(child);
+                    String child = item.substring(path.length());
+                    if (child.indexOf("/") == 0) {
+                        child = child.substring(1);
+                        log.debug("child: '{}'", child);
+                        if (!child.contains("/")) {
+                            children.add(child);
+                        }
                     }
                 }
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 02ebd06..e9d559e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -30,6 +30,8 @@ import java.net.URI;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import javax.servlet.ServletContext;
@@ -66,9 +68,9 @@ import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
-import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
@@ -111,6 +113,11 @@ public abstract class AdminResource extends 
PulsarWebResource {
         ZkUtils.createFullPathOptimistic(globalZk(), path, content, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
+    protected void zkCreateOptimisticAsync(String path, byte[] content, 
AsyncCallback.StringCallback callback) {
+        ZkUtils.asyncCreateFullPathOptimistic(globalZk(), path, content, 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT, callback, null);
+    }
+
     protected boolean zkPathExists(String path) throws KeeperException, 
InterruptedException {
         Stat stat = globalZk().exists(path, false);
         if (null != stat) {
@@ -119,6 +126,21 @@ public abstract class AdminResource extends 
PulsarWebResource {
         return false;
     }
 
+    protected void zkSync(String path) throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicInteger rc = new 
AtomicInteger(KeeperException.Code.OK.intValue());
+        globalZk().sync(path, (rc2, s, ctx) -> {
+            if (KeeperException.Code.OK.intValue() != rc2) {
+                rc.set(rc2);
+            }
+            latch.countDown();
+        }, null);
+        latch.await();
+        if (KeeperException.Code.OK.intValue() != rc.get()) {
+            throw KeeperException.create(KeeperException.Code.get(rc.get()));
+        }
+    }
+
     /**
      * Get the domain of the topic (whether it's persistent or non-persistent)
      */
@@ -233,6 +255,37 @@ public abstract class AdminResource extends 
PulsarWebResource {
         return namespaces;
     }
 
+    protected void tryCreatePartitionsAsync(int numPartitions) {
+        if (!topicName.isPersistent()) {
+            return;
+        }
+        for (int i = 0; i < numPartitions; i++) {
+            tryCreatePartitionAsync(i);
+        }
+    }
+
+    private void tryCreatePartitionAsync(final int partition) {
+        
zkCreateOptimisticAsync(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)),
 new byte[0],
+            (rc, s, o, s1) -> {
+                if (KeeperException.Code.OK.intValue() == rc) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Topic partition {} created.", 
clientAppId(),
+                            topicName.getPartition(partition));
+                    }
+                } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+                    log.info("[{}] Topic partition {} is exists, doing 
nothing.", clientAppId(),
+                        topicName.getPartition(partition));
+                } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
+                    log.warn("[{}] Fail to create topic partition {} with 
concurrent modification, retry now.",
+                            clientAppId(), topicName.getPartition(partition));
+                    tryCreatePartitionAsync(partition);
+                } else {
+                    log.error("[{}] Fail to create topic partition {}", 
clientAppId(),
+                        topicName.getPartition(partition), 
KeeperException.create(KeeperException.Code.get(rc)));
+                }
+        });
+    }
+
     protected NamespaceName namespaceName;
 
     protected void validateNamespaceName(String property, String namespace) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java
index c60422d..95954f6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java
@@ -31,6 +31,10 @@ public class ZkAdminPaths {
                 name.getNamespace(), name.getDomain().value(), 
name.getEncodedLocalName());
     }
 
+    public static String managedLedgerPath(TopicName name) {
+        return "/managed-ledgers/" + name.getPersistenceNamingEncoding();
+    }
+
     public static String namespacePoliciesPath(NamespaceName name) {
         return adminPath(POLICIES, name.toString());
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 972e1c3..7d4c8eb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -116,7 +116,6 @@ import org.slf4j.LoggerFactory;
 public class PersistentTopicsBase extends AdminResource {
     private static final Logger log = 
LoggerFactory.getLogger(PersistentTopicsBase.class);
 
-    public static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
     private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
     private static final String DEPRECATED_CLIENT_VERSION_PREFIX = 
"Pulsar-CPP-v";
     private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = 
Version.forIntegers(1, 21);
@@ -414,8 +413,9 @@ public class PersistentTopicsBase extends AdminResource {
             String path = ZkAdminPaths.partitionedTopicPath(topicName);
             byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
             zkCreateOptimistic(path, data);
-            // we wait for the data to be synced in all quorums and the 
observers
-            Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
+            tryCreatePartitionsAsync(numPartitions);
+            // Sync data to all quorums and the observers
+            zkSync(path);
             log.info("[{}] Successfully created partitioned topic {}", 
clientAppId(), topicName);
         } catch (KeeperException.NodeExistsException e) {
             log.warn("[{}] Failed to create already existing partitioned topic 
{}", clientAppId(), topicName);
@@ -533,6 +533,13 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected void internalCreateMissedPartitions() {
+        PartitionedTopicMetadata metadata = 
getPartitionedTopicMetadata(topicName, false, false);
+        if (metadata != null) {
+            tryCreatePartitionsAsync(metadata.partitions);
+        }
+    }
+
     private CompletableFuture<Void> updatePartitionInOtherCluster(int 
numPartitions, Set<String> clusters) {
         List<CompletableFuture<Void>> results = new 
ArrayList<>(clusters.size() -1);
         clusters.forEach(cluster -> {
@@ -620,8 +627,8 @@ public class PersistentTopicsBase extends AdminResource {
             try {
                 globalZk().delete(path, -1);
                 globalZkCache().invalidate(path);
-                // we wait for the data to be synced in all quorums and the 
observers
-                Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
+                // Sync data to all quorums and the observers
+                zkSync(path);
                 log.info("[{}] Deleted partitioned topic {}", clientAppId(), 
topicName);
                 asyncResponse.resume(Response.noContent().build());
                 return;
@@ -1839,24 +1846,28 @@ public class PersistentTopicsBase extends AdminResource 
{
             }
 
             
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats
 -> {
-                stats.subscriptions.keySet().forEach(subscription -> {
-                    List<CompletableFuture<Void>> subscriptionFutures = new 
ArrayList<>();
-                    for (int i = partitionMetadata.partitions; i < 
numPartitions; i++) {
-                        final String topicNamePartition = 
topicName.getPartition(i).toString();
+                if (stats.subscriptions.size() == 0) {
+                    result.complete(null);
+                } else {
+                    stats.subscriptions.keySet().forEach(subscription -> {
+                        List<CompletableFuture<Void>> subscriptionFutures = 
new ArrayList<>();
+                        for (int i = partitionMetadata.partitions; i < 
numPartitions; i++) {
+                            final String topicNamePartition = 
topicName.getPartition(i).toString();
 
-                        
subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
-                                subscription, MessageId.latest));
-                    }
+                            
subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
+                                    subscription, MessageId.latest));
+                        }
 
-                    FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
-                        log.info("[{}] Successfully created new partitions 
{}", clientAppId(), topicName);
-                        result.complete(null);
-                    }).exceptionally(ex -> {
-                        log.warn("[{}] Failed to create subscriptions on new 
partitions for {}", clientAppId(), topicName, ex);
-                        result.completeExceptionally(ex);
-                        return null;
+                        FutureUtil.waitForAll(subscriptionFutures).thenRun(() 
-> {
+                            log.info("[{}] Successfully created new partitions 
{}", clientAppId(), topicName);
+                            result.complete(null);
+                        }).exceptionally(ex -> {
+                            log.warn("[{}] Failed to create subscriptions on 
new partitions for {}", clientAppId(), topicName, ex);
+                            result.completeExceptionally(ex);
+                            return null;
+                        });
                     });
-                });
+                }
             }).exceptionally(ex -> {
                 if (ex.getCause() instanceof 
PulsarAdminException.NotFoundException) {
                     // The first partition doesn't exist, so there are 
currently to subscriptions to recreate
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 7667167..0179847 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -144,8 +144,8 @@ public class NonPersistentTopics extends PersistentTopics {
                     topicName.getEncodedLocalName());
             byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
             zkCreateOptimistic(path, data);
-            // we wait for the data to be synced in all quorums and the 
observers
-            Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
+            // Sync data to all quorums and the observers
+            zkSync(path);
             log.info("[{}] Successfully created partitioned topic {}", 
clientAppId(), topicName);
         } catch (KeeperException.NodeExistsException e) {
             log.warn("[{}] Failed to create already existing partitioned topic 
{}", clientAppId(), topicName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index eeaeb96..a41db33 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -190,8 +190,8 @@ public class NonPersistentTopics extends PersistentTopics {
                     topicName.getEncodedLocalName());
             byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
             zkCreateOptimistic(path, data);
-            // we wait for the data to be synced in all quorums and the 
observers
-            Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
+            // Sync data to all quorums and the observers
+            zkSync(path);
             log.info("[{}] Successfully created partitioned topic {}", 
clientAppId(), topicName);
         } catch (KeeperException.NodeExistsException e) {
             log.warn("[{}] Failed to create already existing partitioned topic 
{}", clientAppId(), topicName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9eec8ae..08411dd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -246,7 +246,7 @@ public class PersistentTopics extends PersistentTopicsBase {
      */
     @POST
     @Path("/{tenant}/{namespace}/{topic}/partitions")
-    @ApiOperation(value = "Increment partitons of an existing partitioned 
topic.", notes = "It only increments partitions of existing non-global 
partitioned-topic")
+    @ApiOperation(value = "Increment partitions of an existing partitioned 
topic.", notes = "It only increments partitions of existing non-global 
partitioned-topic")
     @ApiResponses(value = {
             @ApiResponse(code = 401, message = "Don't have permission to 
adminisActions to be grantedtrate resources on this tenant"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
@@ -270,6 +270,30 @@ public class PersistentTopics extends PersistentTopicsBase 
{
         internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly);
     }
 
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/createMissedPartitions")
+    @ApiOperation(value = "Create missed partitions of an existing partitioned 
topic.", notes = "This is a best-effort operation for create missed partitions 
of existing non-global partitioned-topic and does't throw any exceptions when 
create failed")
+    @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "Don't have permission to 
adminisActions to be grantedtrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant does not exist"),
+            @ApiResponse(code = 409, message = "Partitioned topic does not 
exist"),
+            @ApiResponse(code = 412, message = "Partitioned topic name is 
invalid"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    public void createMissedPartitions(
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic) {
+
+        validatePartitionedTopicName(tenant, namespace, encodedTopic);
+        internalCreateMissedPartitions();
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/{topic}/partitions")
     @ApiOperation(value = "Get partitioned topic metadata.")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index aa0e198..b9fd0a7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -75,7 +75,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
-import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -83,7 +82,6 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -1821,10 +1819,15 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                     partitionedTopicPath(topicName), content,
                     ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, 
path1, ctx, name) -> {
                         if (rc == KeeperException.Code.OK.intValue()) {
-                            // we wait for the data to be synced in all 
quorums and the observers
-                            executor().schedule(
-                                    SafeRunnable.safeRun(() -> 
partitionedTopicFuture.complete(configMetadata)),
-                                    
PersistentTopicsBase.PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS, 
TimeUnit.MILLISECONDS);
+                            // Sync data to all quorums and the observers
+                            
pulsar.getGlobalZkCache().getZooKeeper().sync(partitionedTopicPath(topicName),
+                                (rc2, path2, ctx2) -> {
+                                    if (rc2 == 
KeeperException.Code.OK.intValue()) {
+                                        
partitionedTopicFuture.complete(configMetadata);
+                                    } else {
+                                        
partitionedTopicFuture.completeExceptionally(KeeperException.create(rc2));
+                                    }
+                            }, null);
                         } else {
                             
partitionedTopicFuture.completeExceptionally(KeeperException.create(rc));
                         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 855a20d..a0b48cf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -815,9 +815,8 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 4);
 
-        // check if the virtual topic doesn't get created
         List<String> topics = admin.topics().getList("prop-xyz/ns1");
-        assertEquals(topics.size(), 0);
+        assertEquals(topics.size(), 4);
 
         
assertEquals(admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/ns1/ds2").partitions,
                 0);
@@ -829,15 +828,8 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(admin.topics().getPartitionedStats(partitionedTopicName, 
false).partitions.size(),
                 0);
 
-        try {
-            admin.topics().getSubscriptions(partitionedTopicName);
-            fail("should have failed");
-        } catch (PulsarAdminException e) {
-            // ok
-            assertEquals(e.getStatusCode(), Status.NOT_FOUND.getStatusCode());
-        } catch (Exception e) {
-            fail(e.getMessage());
-        }
+        List<String> subscriptions = 
admin.topics().getSubscriptions(partitionedTopicName);
+        assertEquals(subscriptions.size(), 0);
 
         // create consumer and subscription
         PulsarClient client = PulsarClient.builder()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index cdeb418..a4cd325 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -159,17 +159,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         // 3) Create the partitioned topic
         persistentTopics.createPartitionedTopic(testTenant, testNamespace, 
testLocalTopicName, 3);
 
-        // 4) Confirm that the topic partitions has not been created yet
-        response = mock(AsyncResponse.class);
-        persistentTopics.getSubscriptions(response, testTenant, testNamespace, 
testLocalTopicName + "-partition-0",
-                true);
-        errorCaptor = ArgumentCaptor.forClass(RestException.class);
-        verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
-        Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(),
-                Response.Status.NOT_FOUND.getStatusCode());
-        Assert.assertEquals(errorCaptor.getValue().getMessage(), "Topic 
partitions were not yet created");
-
-        // 5) Create a subscription
+        // 4) Create a subscription
         response = mock(AsyncResponse.class);
         persistentTopics.createSubscription(response, testTenant, 
testNamespace, testLocalTopicName, "test", true,
                 (MessageIdImpl) MessageId.earliest, false);
@@ -177,26 +167,26 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
 
-        // 6) Confirm that the subscription exists
+        // 5) Confirm that the subscription exists
         response = mock(AsyncResponse.class);
         persistentTopics.getSubscriptions(response, testTenant, testNamespace, 
testLocalTopicName + "-partition-0",
                 true);
         verify(response, 
timeout(5000).times(1)).resume(Lists.newArrayList("test"));
 
-        // 7) Delete the subscription
+        // 6) Delete the subscription
         response = mock(AsyncResponse.class);
         persistentTopics.deleteSubscription(response, testTenant, 
testNamespace, testLocalTopicName, "test", true);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
 
-        // 8) Confirm that the subscription does not exist
+        // 7) Confirm that the subscription does not exist
         response = mock(AsyncResponse.class);
         persistentTopics.getSubscriptions(response, testTenant, testNamespace, 
testLocalTopicName + "-partition-0",
                 true);
         verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
 
-        // 9) Delete the partitioned topic
+        // 8) Delete the partitioned topic
         response = mock(AsyncResponse.class);
         persistentTopics.deletePartitionedTopic(response, testTenant, 
testNamespace, testLocalTopicName, true, true);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index e3965bd..cf5db2b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -782,9 +782,8 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 4);
 
-        // check if the virtual topic doesn't get created
         List<String> topics = admin.topics().getList("prop-xyz/use/ns1");
-        assertEquals(topics.size(), 0);
+        assertEquals(topics.size(), 4);
 
         assertEquals(
                 
admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/use/ns1/ds2").partitions,
@@ -809,6 +808,9 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
             fail(e.getMessage());
         }
 
+        List<String> subscriptions = 
admin.topics().getSubscriptions(partitionedTopicName);
+        assertEquals(subscriptions.size(), 1);
+
         Consumer<byte[]> consumer1 = 
client.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub-1")
                 .subscribe();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
new file mode 100644
index 0000000..2647bbb
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.admin.ZkAdminPaths;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
+
+public class PartitionCreationTest extends ProducerConsumerBase {
+
+    @DataProvider(name = "topicDomainProvider")
+    public Object[][] topicDomainProvider() {
+        return new Object[][] {
+                { TopicDomain.persistent },
+                { TopicDomain.non_persistent }
+        };
+    }
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        conf.setManagedLedgerCacheEvictionFrequency(0.1);
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(dataProvider = "topicDomainProvider")
+    public void 
testCreateConsumerForPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain 
domain) throws PulsarAdminException, PulsarClientException {
+        conf.setAllowAutoTopicCreation(false);
+        final String topic = domain.value() + 
"://public/default/testCreateConsumerWhenDisableTopicAutoCreation";
+        admin.topics().createPartitionedTopic(topic, 3);
+        
Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
+    }
+
+    @Test(dataProvider = "topicDomainProvider")
+    public void 
testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain
 domain) throws PulsarClientException {
+        conf.setAllowAutoTopicCreation(false);
+        final String topic = domain.value() + 
"://public/default/testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation";
+        try {
+            Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe();
+            if (domain == TopicDomain.persistent) {
+                Assert.fail("should be failed");
+            } else {
+                // passed non persistent topic here since we can not avoid 
auto creation on non persistent topic now.
+                Assert.assertNotNull(consumer);
+            }
+        } catch (PulsarClientException.TopicDoesNotExistException e) {
+            //ok
+        }
+    }
+
+    @Test(dataProvider = "topicDomainProvider")
+    public void 
testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain 
domain) throws PulsarAdminException, PulsarClientException {
+        conf.setAllowAutoTopicCreation(true);
+        final String topic = domain.value() + 
"://public/default/testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation";
+        admin.topics().createPartitionedTopic(topic, 3);
+        
Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
+    }
+
+    @Test(dataProvider = "topicDomainProvider")
+    public void 
testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain 
domain) throws PulsarClientException {
+        conf.setAllowAutoTopicCreation(true);
+        final String topic = domain.value() + 
"://public/default/testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation";
+        
Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
+    }
+
+    @Test
+    public void 
testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation() 
throws PulsarClientException, PulsarAdminException {
+        conf.setAllowAutoTopicCreation(false);
+        final String topic = 
"testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation";
+        admin.topics().createPartitionedTopic(topic, 3);
+        MultiTopicsConsumerImpl<byte[]> consumer = 
(MultiTopicsConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe();
+        Assert.assertNotNull(consumer);
+        Assert.assertEquals(consumer.getConsumers().size(), 3);
+        consumer.close();
+        admin.topics().updatePartitionedTopic(topic, 5);
+        consumer = (MultiTopicsConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe();
+        Assert.assertNotNull(consumer);
+        Assert.assertEquals(consumer.getConsumers().size(), 5);
+    }
+
+    @Test
+    public void testCreateMissedPartitions() throws JsonProcessingException, 
KeeperException, InterruptedException, PulsarAdminException, 
PulsarClientException {
+        conf.setAllowAutoTopicCreation(false);
+        final String topic = "testCreateMissedPartitions";
+        String path = ZkAdminPaths.partitionedTopicPath(TopicName.get(topic));
+        int numPartitions = 3;
+        byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
+        // simulate partitioned topic without partitions
+        
ZkUtils.createFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(), 
path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        Consumer<byte[]> consumer = null;
+        try {
+            consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribeAsync().get(3,
 TimeUnit.SECONDS);
+        } catch (Exception e) {
+            //ok here, consumer will create failed with 'Topic does not exist'
+        }
+        Assert.assertNull(consumer);
+        admin.topics().createMissedPartitions(topic);
+        consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe();
+        Assert.assertNotNull(consumer);
+        Assert.assertTrue(consumer instanceof MultiTopicsConsumerImpl);
+        
Assert.assertEquals(((MultiTopicsConsumerImpl)consumer).getConsumers().size(), 
3);
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 7413825..a5b20f3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -487,11 +487,11 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             .receiverQueueSize(4)
             .subscribe();
 
-        // 3. verify consumer get methods, to get 0 number of partitions and 
topics.
+        // 3. verify consumer get methods, to get 5 number of partitions and 
topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 0);
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 0);
-        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 0);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 5);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 5);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 2);
 
         // 4. create producer
         String messagePredicate = "my-message-" + key + "-";
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 39c007c..c5ec041 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -212,6 +212,16 @@ public interface Topics {
     void createNonPartitionedTopic(String topic) throws PulsarAdminException;
 
     /**
+     * Create missed partitions for partitioned topic.
+     * <p>
+     * When disable topic auto creation, use this method to try create missed 
partitions while
+     * partitions create failed or users already have partitioned topic 
without partitions.
+     *
+     * @param topic partitioned topic name
+     */
+    void createMissedPartitions(String topic) throws PulsarAdminException;
+
+    /**
      * Create a partitioned topic asynchronously.
      * <p>
      * Create a partitioned topic asynchronously. It needs to be called before 
creating a producer for a partitioned
@@ -234,6 +244,16 @@ public interface Topics {
     CompletableFuture<Void> createNonPartitionedTopicAsync(String topic);
 
     /**
+     * Create missed partitions for partitioned topic asynchronously.
+     * <p>
+     * When disable topic auto creation, use this method to try create missed 
partitions while
+     * partitions create failed or users already have partitioned topic 
without partitions.
+     *
+     * @param topic partitioned topic name
+     */
+    CompletableFuture<Void> createMissedPartitionsAsync(String topic);
+
+    /**
      * Update number of partitions of a non-global partitioned topic.
      * <p>
      * It requires partitioned-topic to be already exist and number of new 
partitions must be greater than existing
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 3c31845..75c3d59 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -225,6 +225,20 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
+    public void createMissedPartitions(String topic) throws 
PulsarAdminException {
+        try {
+            createMissedPartitionsAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
     public CompletableFuture<Void> createNonPartitionedTopicAsync(String 
topic){
        TopicName tn = validateTopic(topic);
        WebTarget path = topicPath(tn);
@@ -240,6 +254,13 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
+    public CompletableFuture<Void> createMissedPartitionsAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "createMissedPartitions");
+        return asyncPostRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void updatePartitionedTopic(String topic, int numPartitions)
             throws PulsarAdminException {
         try {
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 06775e1..f751cc2 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -713,6 +713,8 @@ public class PulsarClientException extends IOException {
             return new ChecksumException(msg);
         } else if (cause instanceof CryptoException) {
             return new CryptoException(msg);
+        } else if (cause instanceof TopicDoesNotExistException) {
+            return new TopicDoesNotExistException(msg);
         } else {
             return new PulsarClientException(t);
         }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9a2ae1b..af5aa02 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -82,6 +82,7 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("expire-messages-all-subscriptions", new 
ExpireMessagesForAllSubscriptions());
 
         jcommander.addCommand("create-partitioned-topic", new 
CreatePartitionedCmd());
+        jcommander.addCommand("create-missed-partitions", new 
CreateMissedPartitionsCmd());
         jcommander.addCommand("create", new CreateNonPartitionedCmd());
         jcommander.addCommand("update-partitioned-topic", new 
UpdatePartitionedCmd());
         jcommander.addCommand("get-partitioned-topic-metadata", new 
GetPartitionedTopicMetadataCmd());
@@ -214,6 +215,21 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Try to create partitions for partitioned 
topic. \n"
+            + "\t\t     The partitions of partition topic has to be created, 
can be used by repair partitions when \n"
+            + "\t\t     topic auto creation is disabled")
+    private class CreateMissedPartitionsCmd extends CliCommand {
+
+        @Parameter(description = "persistent://tenant/namespace/topic\n", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+            topics.createMissedPartitions(topic);
+        }
+    }
+
     @Parameters(commandDescription = "Create a non-partitioned topic.")
     private class CreateNonPartitionedCmd extends CliCommand {
        
diff --git a/site2/docs/admin-api-partitioned-topics.md 
b/site2/docs/admin-api-partitioned-topics.md
index fe0ab98..a6507e0 100644
--- a/site2/docs/admin-api-partitioned-topics.md
+++ b/site2/docs/admin-api-partitioned-topics.md
@@ -62,6 +62,34 @@ int numPartitions = 4;
 admin.persistentTopics().createPartitionedTopic(topicName, numPartitions);
 ```
 
+### Create missed partitions
+
+Try to create partitions for partitioned topic. The partitions of partition 
topic has to be created, 
+can be used by repair partitions when topic auto creation is disabled
+
+#### pulsar-admin
+
+You can create missed partitions using the 
[`create-missed-partitions`](reference-pulsar-admin.md#create-missed-partitions)
+command and specifying the topic name as an argument.
+
+Here's an example:
+
+```shell
+$ bin/pulsar-admin topics create-missed-partitions \
+  persistent://my-tenant/my-namespace/my-topic \
+```
+
+#### REST API
+
+{@inject: 
endpoint|POST|/admin/v2/persistent/:tenant/:namespace/:topic|operation/createMissedPartitions}
+
+#### Java
+
+```java
+String topicName = "persistent://my-tenant/my-namespace/my-topic";
+admin.persistentTopics().createMissedPartitions(topicName);
+```
+
 ### Get metadata
 
 Partitioned topics have metadata associated with them that you can fetch as a 
JSON object.
diff --git a/site2/docs/reference-pulsar-admin.md 
b/site2/docs/reference-pulsar-admin.md
index 6e9f7a3..af8c88b 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1603,6 +1603,7 @@ Subcommands
 * `offload`
 * `offload-status`
 * `create-partitioned-topic`
+* `create-missed-partitions`
 * `delete-partitioned-topic`
 * `create`
 * `get-partitioned-topic-metadata`
@@ -1704,6 +1705,15 @@ Options
 |---|---|---|
 |`-p`, `--partitions`|The number of partitions for the topic|0|
 
+### `create-missed-partitions`
+Try to create partitions for partitioned topic. The partitions of partition 
topic has to be created, 
+can be used by repair partitions when topic auto creation is disabled
+
+Usage
+```bash
+$ pulsar-admin topics create-missed-partitions 
persistent://tenant/namespace/topic
+```
+
 ### `delete-partitioned-topic`
 Delete a partitioned topic. This will also delete all the partitions of the 
topic if they exist.
 

Reply via email to