This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 19ccfd5  Fix create partitioned topic with a substring of an existing 
topic name. (#6478)
19ccfd5 is described below

commit 19ccfd5c60020a32bceeca128a9846ca006f0dc7
Author: lipenghui <peng...@apache.org>
AuthorDate: Fri Mar 6 16:50:10 2020 +0800

    Fix create partitioned topic with a substring of an existing topic name. 
(#6478)
    
    Fixes #6468
    
    Fix create a partitioned topic with a substring of an existing topic name. 
And make create partitioned topic async.
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 114 ++++++++++++++++++++-
 .../broker/admin/impl/PersistentTopicsBase.java    |  97 +++---------------
 .../broker/admin/v1/NonPersistentTopics.java       |  34 +-----
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  16 ++-
 .../broker/admin/v2/NonPersistentTopics.java       |  37 ++-----
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  29 ++++--
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   8 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   6 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  57 ++++++++---
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |   4 +-
 10 files changed, 227 insertions(+), 175 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 722da4f..a216982 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
 
 import java.net.MalformedURLException;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -36,6 +37,7 @@ import java.util.stream.Collectors;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
@@ -46,6 +48,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -255,16 +258,19 @@ public abstract class AdminResource extends 
PulsarWebResource {
         return namespaces;
     }
 
-    protected void tryCreatePartitionsAsync(int numPartitions) {
+    protected CompletableFuture<Void> tryCreatePartitionsAsync(int 
numPartitions) {
         if (!topicName.isPersistent()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
+        List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions);
         for (int i = 0; i < numPartitions; i++) {
-            tryCreatePartitionAsync(i);
+            futures.add(tryCreatePartitionAsync(i, null));
         }
+        return FutureUtil.waitForAll(futures);
     }
 
-    private void tryCreatePartitionAsync(final int partition) {
+    private CompletableFuture<Void> tryCreatePartitionAsync(final int 
partition, CompletableFuture<Void> reuseFuture) {
+        CompletableFuture<Void> result = reuseFuture == null ? new 
CompletableFuture<>() : reuseFuture;
         zkCreateOptimisticAsync(localZk(), 
ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
             (rc, s, o, s1) -> {
                 if (KeeperException.Code.OK.intValue() == rc) {
@@ -272,18 +278,22 @@ public abstract class AdminResource extends 
PulsarWebResource {
                         log.debug("[{}] Topic partition {} created.", 
clientAppId(),
                             topicName.getPartition(partition));
                     }
+                    result.complete(null);
                 } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
                     log.info("[{}] Topic partition {} is exists, doing 
nothing.", clientAppId(),
                         topicName.getPartition(partition));
+                    
result.completeExceptionally(KeeperException.create(KeeperException.Code.NODEEXISTS));
                 } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
                     log.warn("[{}] Fail to create topic partition {} with 
concurrent modification, retry now.",
                             clientAppId(), topicName.getPartition(partition));
-                    tryCreatePartitionAsync(partition);
+                    tryCreatePartitionAsync(partition, result);
                 } else {
                     log.error("[{}] Fail to create topic partition {}", 
clientAppId(),
                         topicName.getPartition(partition), 
KeeperException.create(KeeperException.Code.get(rc)));
+                    
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                 }
         });
+        return result;
     }
 
     protected NamespaceName namespaceName;
@@ -707,4 +717,98 @@ public abstract class AdminResource extends 
PulsarWebResource {
         partitionedTopics.sort(null);
         return partitionedTopics;
     }
+
+    protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, 
int numPartitions) {
+        try {
+            validateAdminAccessForTenant(topicName.getTenant());
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+        if (numPartitions <= 0) {
+            asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, 
"Number of partitions should be more than 0"));
+            return;
+        }
+        checkTopicExistsAsync(topicName).thenAccept(exists -> {
+            if (exists) {
+                log.warn("[{}] Failed to create already existing topic {}", 
clientAppId(), topicName);
+                asyncResponse.resume(new RestException(Status.CONFLICT, "This 
topic already exists"));
+            } else {
+                try {
+                    String path = ZkAdminPaths.partitionedTopicPath(topicName);
+                    byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
+                    zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, 
s1) -> {
+                        if (KeeperException.Code.OK.intValue() == rc) {
+                            globalZk().sync(path, (rc2, s2, ctx) -> {
+                                if (KeeperException.Code.OK.intValue() == rc2) 
{
+                                    log.info("[{}] Successfully created 
partitioned topic {}", clientAppId(), topicName);
+                                    
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
+                                        log.info("[{}] Successfully created 
partitions for topic {}", clientAppId(), topicName);
+                                        
asyncResponse.resume(Response.noContent().build());
+                                    }).exceptionally(e -> {
+                                        log.error("[{}] Failed to create 
partitions for topic {}", clientAppId(), topicName);
+                                        // The partitioned topic is created 
but there are some partitions create failed
+                                        asyncResponse.resume(new 
RestException(e));
+                                        return null;
+                                    });
+                                } else {
+                                    log.error("[{}] Failed to create 
partitioned topic {}", clientAppId(), topicName, 
KeeperException.create(KeeperException.Code.get(rc2)));
+                                    asyncResponse.resume(new 
RestException(KeeperException.create(KeeperException.Code.get(rc2))));
+                                }
+                            }, null);
+                        } else if (KeeperException.Code.NODEEXISTS.intValue() 
== rc) {
+                            log.warn("[{}] Failed to create already existing 
partitioned topic {}", clientAppId(), topicName);
+                            asyncResponse.resume(new 
RestException(Status.CONFLICT, "Partitioned topic already exists"));
+                        } else if (KeeperException.Code.BADVERSION.intValue() 
== rc) {
+                            log.warn("[{}] Failed to create partitioned topic 
{}: concurrent modification", clientAppId(),
+                                    topicName);
+                            asyncResponse.resume(new 
RestException(Status.CONFLICT, "Concurrent modification"));
+                        } else {
+                            log.error("[{}] Failed to create partitioned topic 
{}", clientAppId(), topicName, 
KeeperException.create(KeeperException.Code.get(rc)));
+                            asyncResponse.resume(new 
RestException(KeeperException.create(KeeperException.Code.get(rc))));
+                        }
+                    });
+                } catch (Exception e) {
+                    log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                }
+            }
+        }).exceptionally(ex -> {
+            log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, ex);
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+            return null;
+        });
+    }
+
+    /**
+     * Check the exists topics contains the given topic.
+     * Since there are topic partitions and non-partitioned topics in Pulsar, 
must ensure both partitions
+     * and non-partitioned topics are not duplicated. So, if compare with a 
partition name, we should compare
+     * to the partitioned name of this partition.
+     *
+     * @param topicName given topic name
+     */
+    protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName 
topicName) {
+        return 
pulsar().getNamespaceService().getListOfTopics(topicName.getNamespaceObject(),
+                PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+                .thenCompose(topics -> {
+                    boolean exists = false;
+                    for (String topic : topics) {
+                        if 
(topicName.getPartitionedTopicName().equals(TopicName.get(topic).getPartitionedTopicName()))
 {
+                            exists = true;
+                            break;
+                        }
+                    }
+                    return CompletableFuture.completedFuture(exists);
+                });
+    }
+
+    protected void resumeAsyncResponseExceptionally(AsyncResponse 
asyncResponse, Throwable throwable) {
+        if (throwable instanceof WebApplicationException) {
+            asyncResponse.resume(throwable);
+        } else {
+            asyncResponse.resume(new RestException(throwable));
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index e37f09d..f2a95d1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.admin.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import org.apache.pulsar.common.api.proto.PulsarApi;
+
 import static org.apache.pulsar.common.util.Codec.decode;
 
 import com.github.zafarkhaja.semver.Version;
@@ -390,46 +390,6 @@ public class PersistentTopicsBase extends AdminResource {
         revokePermissions(topicName.toString(), role);
     }
 
-    protected void internalCreatePartitionedTopic(int numPartitions) {
-        validateAdminAccessForTenant(topicName.getTenant());
-        if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be more than 0");
-        }
-        validatePartitionTopicName(topicName.getLocalName());
-        try {
-            boolean topicExist = pulsar().getNamespaceService()
-                    .getListOfTopics(topicName.getNamespaceObject(), 
PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
-                    .join()
-                    .contains(topicName.toString());
-            if (topicExist) {
-                log.warn("[{}] Failed to create already existing topic {}", 
clientAppId(), topicName);
-                throw new RestException(Status.CONFLICT, "This topic already 
exists");
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
-        try {
-            String path = ZkAdminPaths.partitionedTopicPath(topicName);
-            byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            tryCreatePartitionsAsync(numPartitions);
-            // Sync data to all quorums and the observers
-            zkSync(path);
-            log.info("[{}] Successfully created partitioned topic {}", 
clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic 
{}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic 
already exists");
-        } catch (KeeperException.BadVersionException e) {
-                log.warn("[{}] Failed to create partitioned topic {}: 
concurrent modification", clientAppId(),
-                        topicName);
-                throw new RestException(Status.CONFLICT, "Concurrent 
modification");
-        } catch (Exception e) {
-            log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
-    }
-
     protected void internalCreateNonPartitionedTopic(boolean authoritative) {
         validateAdminAccessForTenant(topicName.getTenant());
         validateNonPartitionTopicName(topicName.getLocalName());
@@ -540,11 +500,22 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    protected void internalCreateMissedPartitions() {
-        PartitionedTopicMetadata metadata = 
getPartitionedTopicMetadata(topicName, false, false);
-        if (metadata != null) {
-            tryCreatePartitionsAsync(metadata.partitions);
-        }
+    protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) 
{
+        getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
+            if (metadata != null) {
+                tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> {
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(e -> {
+                    log.error("[{}] Failed to create partitions for topic {}", 
clientAppId(), topicName);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });
+            }
+        }).exceptionally(e -> {
+            log.error("[{}] Failed to create partitions for topic {}", 
clientAppId(), topicName);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return null;
+        });
     }
 
     private CompletableFuture<Void> updatePartitionInOtherCluster(int 
numPartitions, Set<String> clusters) {
@@ -2072,40 +2043,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     /**
-     * Validate partitioned topic name.
-     * Validation will fail and throw RestException if
-     * 1) There's already a partitioned topic with same topic name and have 
some of its partition created.
-     * 2) There's already non partition topic with same name and contains 
partition suffix "-partition-"
-     * followed by numeric value. In this case internal created partition of 
partitioned topic could override
-     * the existing non partition topic.
-     *
-     * @param topicName
-     */
-    private void validatePartitionTopicName(String topicName) {
-        List<String> existingTopicList = internalGetList();
-        String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX;
-        for (String existingTopicName : existingTopicList) {
-            if (existingTopicName.contains(prefix)) {
-                try {
-                    Long.parseLong(existingTopicName.substring(
-                            
existingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
-                                    + 
TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
-                    log.warn("[{}] Already have topic {} which contains 
partition " +
-                            "suffix '-partition-' and end with numeric value. 
Creation of partitioned topic {}"
-                            + "could cause conflict.", clientAppId(), 
existingTopicName, topicName);
-                    throw new RestException(Status.PRECONDITION_FAILED,
-                            "Already have topic " + existingTopicName + " 
which contains partition suffix '-partition-' " +
-                                    "and end with numeric value, Creation of 
partitioned topic " + topicName +
-                                    " could cause conflict.");
-                } catch (NumberFormatException e) {
-                    // Do nothing, if value after partition suffix is not pure 
numeric value,
-                    // as it can't conflict with internal created partitioned 
topic's name.
-                }
-            }
-        }
-    }
-
-    /**
      * Validate non partition topic name,
      * Validation will fail and throw RestException if
      * 1) Topic name contains partition suffix "-partition-" and the remaining 
part follow the partition
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 4bc0ddf..2338b0f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -124,41 +124,15 @@ public class NonPersistentTopics extends PersistentTopics 
{
     @ApiOperation(hidden = true, value = "Create a partitioned topic.", notes 
= "It needs to be called before creating a producer on a partitioned topic.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 409, message = "Partitioned topic already 
exist") })
-    public void createPartitionedTopic(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
+    public void createPartitionedTopic(@Suspended final AsyncResponse 
asyncResponse, @PathParam("property") String property, @PathParam("cluster") 
String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
             int numPartitions) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        validateAdminAccessForTenant(topicName.getTenant());
-        if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be more than 0");
-        }
         try {
-            boolean topicExist = pulsar().getNamespaceService()
-                    .getListOfTopics(topicName.getNamespaceObject(), 
PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
-                    .join()
-                    .contains(topicName.toString());
-            if (topicExist) {
-                log.warn("[{}] Failed to create already existing topic {}", 
clientAppId(), topicName);
-                throw new RestException(Status.CONFLICT, "This topic already 
exists");
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
-        try {
-            String path = path(PARTITIONED_TOPIC_PATH_ZNODE, 
namespaceName.toString(), domain(),
-                    topicName.getEncodedLocalName());
-            byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            // Sync data to all quorums and the observers
-            zkSync(path);
-            log.info("[{}] Successfully created partitioned topic {}", 
clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic 
{}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic 
already exist");
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
-            throw new RestException(e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 836ca14..362adc8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -57,7 +57,8 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
-import javax.ws.rs.container.AsyncResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  */
@@ -66,7 +67,7 @@ import javax.ws.rs.container.AsyncResponse;
 @Api(value = "/persistent", description = "Persistent topic admin apis", tags 
= "persistent topic", hidden = true)
 @SuppressWarnings("deprecation")
 public class PersistentTopics extends PersistentTopicsBase {
-
+    private static final Logger log = 
LoggerFactory.getLogger(PersistentTopics.class);
     @GET
     @Path("/{property}/{cluster}/{namespace}")
     @ApiOperation(hidden = true, value = "Get the list of topics under a 
namespace.", response = String.class, responseContainer = "List")
@@ -147,11 +148,16 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 409, message = "Partitioned topic already 
exist") })
-    public void createPartitionedTopic(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
+    public void createPartitionedTopic(@Suspended final AsyncResponse 
asyncResponse, @PathParam("property") String property, @PathParam("cluster") 
String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
             int numPartitions) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalCreatePartitionedTopic(numPartitions);
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 7e88eed..3756f82 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -164,6 +164,7 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 503, message = "Failed to validate global 
cluster configuration"),
     })
     public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -172,39 +173,15 @@ public class NonPersistentTopics extends PersistentTopics 
{
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic", 
required = true, type = "int", defaultValue = "0")
             int numPartitions) {
-        validateGlobalNamespaceOwnership(tenant,namespace);
-        validateTopicName(tenant, namespace, encodedTopic);
-        validateAdminAccessForTenant(topicName.getTenant());
-        if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be more than 0");
-        }
-        try {
-            boolean topicExist = pulsar().getNamespaceService()
-                    .getListOfTopics(topicName.getNamespaceObject(), 
PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
-                    .join()
-                    .contains(topicName.toString());
-            if (topicExist) {
-                log.warn("[{}] Failed to create already existing topic {}", 
clientAppId(), topicName);
-                throw new RestException(Status.CONFLICT, "This topic already 
exists");
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
+
         try {
-            String path = path(PARTITIONED_TOPIC_PATH_ZNODE, 
namespaceName.toString(), domain(),
-                    topicName.getEncodedLocalName());
-            byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            // Sync data to all quorums and the observers
-            zkSync(path);
-            log.info("[{}] Successfully created partitioned topic {}", 
clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic 
{}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic 
already exists");
+            validateGlobalNamespaceOwnership(tenant,namespace);
+            validateTopicName(tenant, namespace, encodedTopic);
+            validateAdminAccessForTenant(topicName.getTenant());
+            internalCreatePartitionedTopic(asyncResponse, numPartitions);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
-            throw new RestException(e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 8c59fa5..b2fc28b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -55,6 +55,9 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.ApiParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static org.apache.pulsar.common.util.Codec.decode;
 
 /**
@@ -192,6 +195,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 503, message = "Failed to validate global 
cluster configuration")
     })
     public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -200,9 +204,15 @@ public class PersistentTopics extends PersistentTopicsBase 
{
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic", 
required = true, type = "int", defaultValue = "0")
             int numPartitions) {
-        validateGlobalNamespaceOwnership(tenant,namespace);
-        validatePartitionedTopicName(tenant, namespace, encodedTopic);
-        internalCreatePartitionedTopic(numPartitions);
+        try {
+            validateGlobalNamespaceOwnership(tenant,namespace);
+            validatePartitionedTopicName(tenant, namespace, encodedTopic);
+            validateAdminAccessForTenant(topicName.getTenant());
+            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
     }
 
     @PUT
@@ -276,7 +286,7 @@ public class PersistentTopics extends PersistentTopicsBase {
 
     @POST
     @Path("/{tenant}/{namespace}/{topic}/createMissedPartitions")
-    @ApiOperation(value = "Create missed partitions of an existing partitioned 
topic.", notes = "This is a best-effort operation for create missed partitions 
of existing non-global partitioned-topic and does't throw any exceptions when 
create failed")
+    @ApiOperation(value = "Create missed partitions of an existing partitioned 
topic.")
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
             @ApiResponse(code = 401, message = "Don't have permission to 
adminisActions to be grantedtrate resources on this tenant"),
@@ -287,6 +297,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 500, message = "Internal server error")
     })
     public void createMissedPartitions(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -294,8 +305,12 @@ public class PersistentTopics extends PersistentTopicsBase 
{
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic) {
 
-        validatePartitionedTopicName(tenant, namespace, encodedTopic);
-        internalCreateMissedPartitions();
+        try {
+            validatePartitionedTopicName(tenant, namespace, encodedTopic);
+            internalCreateMissedPartitions(asyncResponse);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
     }
 
     @GET
@@ -1072,4 +1087,6 @@ public class PersistentTopics extends 
PersistentTopicsBase {
         validateTopicName(tenant, namespace, encodedTopic);
         return internalGetLastMessageId(authoritative);
     }
+
+    private static final Logger log = 
LoggerFactory.getLogger(PersistentTopics.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index f0ee4f6..d832628 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -919,9 +919,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         try {
             admin.topics().createPartitionedTopic(partitionedTopicName, 32);
             fail("Should have failed as the partitioned topic already exists");
-        } catch (PreconditionFailedException e) {
-            // Expecting PreconditionFailedException instead of 
ConflictException as it'll
-            // fail validation before actually try to create metadata in ZK.
+        } catch (ConflictException ignore) {
         }
 
         producer = client.newProducer(Schema.BYTES)
@@ -2010,6 +2008,10 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         } catch (PulsarAdminException e) {
             assertTrue(e instanceof ConflictException);
         }
+
+        // Check create partitioned topic with substring topic name
+        
admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/create_substring_topic",
 1);
+        
admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/substring_topic",
 1);
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 00d1a31..3cae752 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -677,7 +677,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest 
{
         verify(response, times(1)).resume(Lists.newArrayList());
         // create topic
         assertEquals(persistentTopics.getPartitionedTopicList(property, 
cluster, namespace), Lists.newArrayList());
-        persistentTopics.createPartitionedTopic(property, cluster, namespace, 
topic, 5);
+        response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, property, cluster, 
namespace, topic, 5);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         assertEquals(persistentTopics.getPartitionedTopicList(property, 
cluster, namespace), Lists
                 .newArrayList(String.format("persistent://%s/%s/%s/%s", 
property, cluster, namespace, topic)));
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 1825d31..cac35a6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -56,6 +56,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -157,13 +158,17 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
                 "Partitioned Topic not found: 
persistent://my-tenant/my-namespace/topic-not-found-partition-0 has zero 
partitions");
 
         // 3) Create the partitioned topic
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, 
testLocalTopicName, 3);
+        response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, testLocalTopicName, 3);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
 
         // 4) Create a subscription
         response = mock(AsyncResponse.class);
         persistentTopics.createSubscription(response, testTenant, 
testNamespace, testLocalTopicName, "test", true,
                 (MessageIdImpl) MessageId.earliest, false);
-        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
 
@@ -239,7 +244,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, 
topicName, true);
     }
 
-    @Test(expectedExceptions = RestException.class)
+    @Test
     public void 
testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws 
KeeperException, InterruptedException {
         // Test the case in which user already has topic like 
topic-name-partition-123 created before we enforce the validation.
         final String nonPartitionTopicName1 = "standard-topic";
@@ -250,7 +255,12 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         
doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
         
doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
         doReturn(ImmutableSet.of(nonPartitionTopicName1, 
nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, 
partitionedTopicName, 5);
+        
doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName1,
 
nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<RestException> errCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionedTopicName, 5);
+        verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
+        Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), 
Response.Status.CONFLICT.getStatusCode());
     }
 
     @Test(expectedExceptions = RestException.class)
@@ -263,13 +273,18 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         
doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
         
doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
         
doReturn(ImmutableSet.of(nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
+        
doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
         doAnswer(invocation -> {
             persistentTopics.namespaceName = NamespaceName.get("tenant", 
"namespace");
             persistentTopics.topicName = TopicName.get("persistent", "tenant", 
"cluster", "namespace", "topicname");
             return null;
         }).when(persistentTopics).validatePartitionedTopicName(any(), any(), 
any());
         
doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString());
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, 
partitionedTopicName, 5);
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionedTopicName, 5);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         persistentTopics.updatePartitionedTopic(testTenant, testNamespace, 
partitionedTopicName, true, 10);
     }
 
@@ -295,7 +310,11 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 
         // 3) create partitioned topic and unload
         response = mock(AsyncResponse.class);
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, 
partitionTopicName, 6);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionTopicName, 6);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
+        response = mock(AsyncResponse.class);
         persistentTopics.unloadTopic(response, testTenant, testNamespace, 
partitionTopicName, true);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -320,10 +339,17 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testGetPartitionedTopicsList() throws KeeperException, 
InterruptedException, PulsarAdminException {
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, "test-topic1", 3);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
 
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, 
"test-topic1", 3);
-
-        nonPersistentTopic.createPartitionedTopic(testTenant, testNamespace, 
"test-topic2", 3);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        nonPersistentTopic.createPartitionedTopic(response, testTenant, 
testNamespace, "test-topic2", 3);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
 
         List<String> persistentPartitionedTopics = 
persistentTopics.getPartitionedTopicList(testTenant, testNamespace);
 
@@ -351,7 +377,11 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
     public void testGrantPartitionedTopic() {
         final String partitionedTopicName = "partitioned-topic";
         final int numPartitions = 5;
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, 
partitionedTopicName, numPartitions);
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionedTopicName, numPartitions);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
 
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
@@ -387,8 +417,11 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
     public void testRevokePartitionedTopic() {
         final String partitionedTopicName = "partitioned-topic";
         final int numPartitions = 5;
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, 
partitionedTopicName, numPartitions);
-
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionedTopicName, numPartitions);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 373a6b7..78e3dc1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -879,9 +879,7 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
         try {
             admin.topics().createPartitionedTopic(partitionedTopicName, 32);
             fail("Should have failed as the partitioned topic exists with its 
partition created");
-        } catch (PreconditionFailedException e) {
-            // Expecting PreconditionFailedException instead of 
ConflictException as it'll
-            // fail validation before actually try to create metadata in ZK.
+        } catch (ConflictException ignore) {
         }
 
         producer = client.newProducer(Schema.BYTES)

Reply via email to