This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 19ccfd5 Fix create partitioned topic with a substring of an existing topic name. (#6478) 19ccfd5 is described below commit 19ccfd5c60020a32bceeca128a9846ca006f0dc7 Author: lipenghui <peng...@apache.org> AuthorDate: Fri Mar 6 16:50:10 2020 +0800 Fix create partitioned topic with a substring of an existing topic name. (#6478) Fixes #6468 Fix create a partitioned topic with a substring of an existing topic name. And make create partitioned topic async. --- .../apache/pulsar/broker/admin/AdminResource.java | 114 ++++++++++++++++++++- .../broker/admin/impl/PersistentTopicsBase.java | 97 +++--------------- .../broker/admin/v1/NonPersistentTopics.java | 34 +----- .../pulsar/broker/admin/v1/PersistentTopics.java | 16 ++- .../broker/admin/v2/NonPersistentTopics.java | 37 ++----- .../pulsar/broker/admin/v2/PersistentTopics.java | 29 ++++-- .../apache/pulsar/broker/admin/AdminApiTest.java | 8 +- .../org/apache/pulsar/broker/admin/AdminTest.java | 6 +- .../pulsar/broker/admin/PersistentTopicsTest.java | 57 ++++++++--- .../pulsar/broker/admin/v1/V1_AdminApiTest.java | 4 +- 10 files changed, 227 insertions(+), 175 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 722da4f..a216982 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; import java.net.MalformedURLException; import java.net.URI; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -36,6 +37,7 @@ import java.util.stream.Collectors; import javax.servlet.ServletContext; import javax.ws.rs.WebApplicationException; +import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriBuilder; @@ -46,6 +48,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; @@ -255,16 +258,19 @@ public abstract class AdminResource extends PulsarWebResource { return namespaces; } - protected void tryCreatePartitionsAsync(int numPartitions) { + protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) { if (!topicName.isPersistent()) { - return; + return CompletableFuture.completedFuture(null); } + List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions); for (int i = 0; i < numPartitions; i++) { - tryCreatePartitionAsync(i); + futures.add(tryCreatePartitionAsync(i, null)); } + return FutureUtil.waitForAll(futures); } - private void tryCreatePartitionAsync(final int partition) { + private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) { + CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture; zkCreateOptimisticAsync(localZk(), ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0], (rc, s, o, s1) -> { if (KeeperException.Code.OK.intValue() == rc) { @@ -272,18 +278,22 @@ public abstract class AdminResource extends PulsarWebResource { log.debug("[{}] Topic partition {} created.", clientAppId(), topicName.getPartition(partition)); } + result.complete(null); } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(), topicName.getPartition(partition)); + result.completeExceptionally(KeeperException.create(KeeperException.Code.NODEEXISTS)); } else if (KeeperException.Code.BADVERSION.intValue() == rc) { log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.", clientAppId(), topicName.getPartition(partition)); - tryCreatePartitionAsync(partition); + tryCreatePartitionAsync(partition, result); } else { log.error("[{}] Fail to create topic partition {}", clientAppId(), topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc))); + result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); } }); + return result; } protected NamespaceName namespaceName; @@ -707,4 +717,98 @@ public abstract class AdminResource extends PulsarWebResource { partitionedTopics.sort(null); return partitionedTopics; } + + protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) { + try { + validateAdminAccessForTenant(topicName.getTenant()); + } catch (Exception e) { + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + if (numPartitions <= 0) { + asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0")); + return; + } + checkTopicExistsAsync(topicName).thenAccept(exists -> { + if (exists) { + log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName); + asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists")); + } else { + try { + String path = ZkAdminPaths.partitionedTopicPath(topicName); + byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); + zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> { + if (KeeperException.Code.OK.intValue() == rc) { + globalZk().sync(path, (rc2, s2, ctx) -> { + if (KeeperException.Code.OK.intValue() == rc2) { + log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); + tryCreatePartitionsAsync(numPartitions).thenAccept(v -> { + log.info("[{}] Successfully created partitions for topic {}", clientAppId(), topicName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(e -> { + log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName); + // The partitioned topic is created but there are some partitions create failed + asyncResponse.resume(new RestException(e)); + return null; + }); + } else { + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2))); + asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2)))); + } + }, null); + } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { + log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName); + asyncResponse.resume(new RestException(Status.CONFLICT, "Partitioned topic already exists")); + } else if (KeeperException.Code.BADVERSION.intValue() == rc) { + log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(), + topicName); + asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification")); + } else { + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc))); + asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc)))); + } + }); + } catch (Exception e) { + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + } + } + }).exceptionally(ex -> { + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + /** + * Check the exists topics contains the given topic. + * Since there are topic partitions and non-partitioned topics in Pulsar, must ensure both partitions + * and non-partitioned topics are not duplicated. So, if compare with a partition name, we should compare + * to the partitioned name of this partition. + * + * @param topicName given topic name + */ + protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName topicName) { + return pulsar().getNamespaceService().getListOfTopics(topicName.getNamespaceObject(), + PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) + .thenCompose(topics -> { + boolean exists = false; + for (String topic : topics) { + if (topicName.getPartitionedTopicName().equals(TopicName.get(topic).getPartitionedTopicName())) { + exists = true; + break; + } + } + return CompletableFuture.completedFuture(exists); + }); + } + + protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) { + if (throwable instanceof WebApplicationException) { + asyncResponse.resume(throwable); + } else { + asyncResponse.resume(new RestException(throwable)); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index e37f09d..f2a95d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -20,7 +20,7 @@ package org.apache.pulsar.broker.admin.impl; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; -import org.apache.pulsar.common.api.proto.PulsarApi; + import static org.apache.pulsar.common.util.Codec.decode; import com.github.zafarkhaja.semver.Version; @@ -390,46 +390,6 @@ public class PersistentTopicsBase extends AdminResource { revokePermissions(topicName.toString(), role); } - protected void internalCreatePartitionedTopic(int numPartitions) { - validateAdminAccessForTenant(topicName.getTenant()); - if (numPartitions <= 0) { - throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"); - } - validatePartitionTopicName(topicName.getLocalName()); - try { - boolean topicExist = pulsar().getNamespaceService() - .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) - .join() - .contains(topicName.toString()); - if (topicExist) { - log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName); - throw new RestException(Status.CONFLICT, "This topic already exists"); - } - } catch (Exception e) { - log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); - throw new RestException(e); - } - try { - String path = ZkAdminPaths.partitionedTopicPath(topicName); - byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); - zkCreateOptimistic(path, data); - tryCreatePartitionsAsync(numPartitions); - // Sync data to all quorums and the observers - zkSync(path); - log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); - } catch (KeeperException.NodeExistsException e) { - log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName); - throw new RestException(Status.CONFLICT, "Partitioned topic already exists"); - } catch (KeeperException.BadVersionException e) { - log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(), - topicName); - throw new RestException(Status.CONFLICT, "Concurrent modification"); - } catch (Exception e) { - log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); - throw new RestException(e); - } - } - protected void internalCreateNonPartitionedTopic(boolean authoritative) { validateAdminAccessForTenant(topicName.getTenant()); validateNonPartitionTopicName(topicName.getLocalName()); @@ -540,11 +500,22 @@ public class PersistentTopicsBase extends AdminResource { } } - protected void internalCreateMissedPartitions() { - PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, false, false); - if (metadata != null) { - tryCreatePartitionsAsync(metadata.partitions); - } + protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { + getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { + if (metadata != null) { + tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> { + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(e -> { + log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName); + resumeAsyncResponseExceptionally(asyncResponse, e); + return null; + }); + } + }).exceptionally(e -> { + log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName); + resumeAsyncResponseExceptionally(asyncResponse, e); + return null; + }); } private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) { @@ -2072,40 +2043,6 @@ public class PersistentTopicsBase extends AdminResource { } /** - * Validate partitioned topic name. - * Validation will fail and throw RestException if - * 1) There's already a partitioned topic with same topic name and have some of its partition created. - * 2) There's already non partition topic with same name and contains partition suffix "-partition-" - * followed by numeric value. In this case internal created partition of partitioned topic could override - * the existing non partition topic. - * - * @param topicName - */ - private void validatePartitionTopicName(String topicName) { - List<String> existingTopicList = internalGetList(); - String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX; - for (String existingTopicName : existingTopicList) { - if (existingTopicName.contains(prefix)) { - try { - Long.parseLong(existingTopicName.substring( - existingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX) - + TopicName.PARTITIONED_TOPIC_SUFFIX.length())); - log.warn("[{}] Already have topic {} which contains partition " + - "suffix '-partition-' and end with numeric value. Creation of partitioned topic {}" - + "could cause conflict.", clientAppId(), existingTopicName, topicName); - throw new RestException(Status.PRECONDITION_FAILED, - "Already have topic " + existingTopicName + " which contains partition suffix '-partition-' " + - "and end with numeric value, Creation of partitioned topic " + topicName + - " could cause conflict."); - } catch (NumberFormatException e) { - // Do nothing, if value after partition suffix is not pure numeric value, - // as it can't conflict with internal created partitioned topic's name. - } - } - } - } - - /** * Validate non partition topic name, * Validation will fail and throw RestException if * 1) Topic name contains partition suffix "-partition-" and the remaining part follow the partition diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java index 4bc0ddf..2338b0f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java @@ -124,41 +124,15 @@ public class NonPersistentTopics extends PersistentTopics { @ApiOperation(hidden = true, value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 409, message = "Partitioned topic already exist") }) - public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster, + public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, int numPartitions) { - validateTopicName(property, cluster, namespace, encodedTopic); - validateAdminAccessForTenant(topicName.getTenant()); - if (numPartitions <= 0) { - throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"); - } try { - boolean topicExist = pulsar().getNamespaceService() - .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) - .join() - .contains(topicName.toString()); - if (topicExist) { - log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName); - throw new RestException(Status.CONFLICT, "This topic already exists"); - } - } catch (Exception e) { - log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); - throw new RestException(e); - } - try { - String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(), - topicName.getEncodedLocalName()); - byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); - zkCreateOptimistic(path, data); - // Sync data to all quorums and the observers - zkSync(path); - log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); - } catch (KeeperException.NodeExistsException e) { - log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName); - throw new RestException(Status.CONFLICT, "Partitioned topic already exist"); + validateTopicName(property, cluster, namespace, encodedTopic); + internalCreatePartitionedTopic(asyncResponse, numPartitions); } catch (Exception e) { log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); - throw new RestException(e); + resumeAsyncResponseExceptionally(asyncResponse, e); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 836ca14..362adc8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -57,7 +57,8 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; -import javax.ws.rs.container.AsyncResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** */ @@ -66,7 +67,7 @@ import javax.ws.rs.container.AsyncResponse; @Api(value = "/persistent", description = "Persistent topic admin apis", tags = "persistent topic", hidden = true) @SuppressWarnings("deprecation") public class PersistentTopics extends PersistentTopicsBase { - + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); @GET @Path("/{property}/{cluster}/{namespace}") @ApiOperation(hidden = true, value = "Get the list of topics under a namespace.", response = String.class, responseContainer = "List") @@ -147,11 +148,16 @@ public class PersistentTopics extends PersistentTopicsBase { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 409, message = "Partitioned topic already exist") }) - public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster, + public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, int numPartitions) { - validateTopicName(property, cluster, namespace, encodedTopic); - internalCreatePartitionedTopic(numPartitions); + try { + validateTopicName(property, cluster, namespace, encodedTopic); + internalCreatePartitionedTopic(asyncResponse, numPartitions); + } catch (Exception e) { + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + } } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 7e88eed..3756f82 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -164,6 +164,7 @@ public class NonPersistentTopics extends PersistentTopics { @ApiResponse(code = 503, message = "Failed to validate global cluster configuration"), }) public void createPartitionedTopic( + @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @@ -172,39 +173,15 @@ public class NonPersistentTopics extends PersistentTopics { @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0") int numPartitions) { - validateGlobalNamespaceOwnership(tenant,namespace); - validateTopicName(tenant, namespace, encodedTopic); - validateAdminAccessForTenant(topicName.getTenant()); - if (numPartitions <= 0) { - throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"); - } - try { - boolean topicExist = pulsar().getNamespaceService() - .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) - .join() - .contains(topicName.toString()); - if (topicExist) { - log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName); - throw new RestException(Status.CONFLICT, "This topic already exists"); - } - } catch (Exception e) { - log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); - throw new RestException(e); - } + try { - String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(), - topicName.getEncodedLocalName()); - byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); - zkCreateOptimistic(path, data); - // Sync data to all quorums and the observers - zkSync(path); - log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); - } catch (KeeperException.NodeExistsException e) { - log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName); - throw new RestException(Status.CONFLICT, "Partitioned topic already exists"); + validateGlobalNamespaceOwnership(tenant,namespace); + validateTopicName(tenant, namespace, encodedTopic); + validateAdminAccessForTenant(topicName.getTenant()); + internalCreatePartitionedTopic(asyncResponse, numPartitions); } catch (Exception e) { log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); - throw new RestException(e); + resumeAsyncResponseExceptionally(asyncResponse, e); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 8c59fa5..b2fc28b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -55,6 +55,9 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import io.swagger.annotations.ApiParam; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static org.apache.pulsar.common.util.Codec.decode; /** @@ -192,6 +195,7 @@ public class PersistentTopics extends PersistentTopicsBase { @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") }) public void createPartitionedTopic( + @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @@ -200,9 +204,15 @@ public class PersistentTopics extends PersistentTopicsBase { @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0") int numPartitions) { - validateGlobalNamespaceOwnership(tenant,namespace); - validatePartitionedTopicName(tenant, namespace, encodedTopic); - internalCreatePartitionedTopic(numPartitions); + try { + validateGlobalNamespaceOwnership(tenant,namespace); + validatePartitionedTopicName(tenant, namespace, encodedTopic); + validateAdminAccessForTenant(topicName.getTenant()); + internalCreatePartitionedTopic(asyncResponse, numPartitions); + } catch (Exception e) { + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + } } @PUT @@ -276,7 +286,7 @@ public class PersistentTopics extends PersistentTopicsBase { @POST @Path("/{tenant}/{namespace}/{topic}/createMissedPartitions") - @ApiOperation(value = "Create missed partitions of an existing partitioned topic.", notes = "This is a best-effort operation for create missed partitions of existing non-global partitioned-topic and does't throw any exceptions when create failed") + @ApiOperation(value = "Create missed partitions of an existing partitioned topic.") @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"), @@ -287,6 +297,7 @@ public class PersistentTopics extends PersistentTopicsBase { @ApiResponse(code = 500, message = "Internal server error") }) public void createMissedPartitions( + @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @@ -294,8 +305,12 @@ public class PersistentTopics extends PersistentTopicsBase { @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String encodedTopic) { - validatePartitionedTopicName(tenant, namespace, encodedTopic); - internalCreateMissedPartitions(); + try { + validatePartitionedTopicName(tenant, namespace, encodedTopic); + internalCreateMissedPartitions(asyncResponse); + } catch (Exception e) { + resumeAsyncResponseExceptionally(asyncResponse, e); + } } @GET @@ -1072,4 +1087,6 @@ public class PersistentTopics extends PersistentTopicsBase { validateTopicName(tenant, namespace, encodedTopic); return internalGetLastMessageId(authoritative); } + + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index f0ee4f6..d832628 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -919,9 +919,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { try { admin.topics().createPartitionedTopic(partitionedTopicName, 32); fail("Should have failed as the partitioned topic already exists"); - } catch (PreconditionFailedException e) { - // Expecting PreconditionFailedException instead of ConflictException as it'll - // fail validation before actually try to create metadata in ZK. + } catch (ConflictException ignore) { } producer = client.newProducer(Schema.BYTES) @@ -2010,6 +2008,10 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { } catch (PulsarAdminException e) { assertTrue(e instanceof ConflictException); } + + // Check create partitioned topic with substring topic name + admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/create_substring_topic", 1); + admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/substring_topic", 1); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 00d1a31..3cae752 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -677,7 +677,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest { verify(response, times(1)).resume(Lists.newArrayList()); // create topic assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList()); - persistentTopics.createPartitionedTopic(property, cluster, namespace, topic, 5); + response = mock(AsyncResponse.class); + ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists .newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic))); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 1825d31..cac35a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -56,6 +56,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -157,13 +158,17 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { "Partitioned Topic not found: persistent://my-tenant/my-namespace/topic-not-found-partition-0 has zero partitions"); // 3) Create the partitioned topic - persistentTopics.createPartitionedTopic(testTenant, testNamespace, testLocalTopicName, 3); + response = mock(AsyncResponse.class); + ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); // 4) Create a subscription response = mock(AsyncResponse.class); persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true, (MessageIdImpl) MessageId.earliest, false); - ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); + responseCaptor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); @@ -239,7 +244,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true); } - @Test(expectedExceptions = RestException.class) + @Test public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws KeeperException, InterruptedException { // Test the case in which user already has topic like topic-name-partition-123 created before we enforce the validation. final String nonPartitionTopicName1 = "standard-topic"; @@ -250,7 +255,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService(); doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache(); doReturn(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString()); - persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, 5); + doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString()); + AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5); + verify(response, timeout(5000).times(1)).resume(errCaptor.capture()); + Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode()); } @Test(expectedExceptions = RestException.class) @@ -263,13 +273,18 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService(); doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache(); doReturn(ImmutableSet.of(nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString()); + doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString()); doAnswer(invocation -> { persistentTopics.namespaceName = NamespaceName.get("tenant", "namespace"); persistentTopics.topicName = TopicName.get("persistent", "tenant", "cluster", "namespace", "topicname"); return null; }).when(persistentTopics).validatePartitionedTopicName(any(), any(), any()); doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString()); - persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, 5); + AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, 10); } @@ -295,7 +310,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { // 3) create partitioned topic and unload response = mock(AsyncResponse.class); - persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionTopicName, 6); + responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + response = mock(AsyncResponse.class); persistentTopics.unloadTopic(response, testTenant, testNamespace, partitionTopicName, true); responseCaptor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); @@ -320,10 +339,17 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { @Test public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException { + AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - persistentTopics.createPartitionedTopic(testTenant, testNamespace, "test-topic1", 3); - - nonPersistentTopic.createPartitionedTopic(testTenant, testNamespace, "test-topic2", 3); + response = mock(AsyncResponse.class); + responseCaptor = ArgumentCaptor.forClass(Response.class); + nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace); @@ -351,7 +377,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { public void testGrantPartitionedTopic() { final String partitionedTopicName = "partitioned-topic"; final int numPartitions = 5; - persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, numPartitions); + AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); String role = "role"; Set<AuthAction> expectActions = new HashSet<>(); @@ -387,8 +417,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { public void testRevokePartitionedTopic() { final String partitionedTopicName = "partitioned-topic"; final int numPartitions = 5; - persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, numPartitions); - + AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); String role = "role"; Set<AuthAction> expectActions = new HashSet<>(); expectActions.add(AuthAction.produce); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 373a6b7..78e3dc1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -879,9 +879,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { try { admin.topics().createPartitionedTopic(partitionedTopicName, 32); fail("Should have failed as the partitioned topic exists with its partition created"); - } catch (PreconditionFailedException e) { - // Expecting PreconditionFailedException instead of ConflictException as it'll - // fail validation before actually try to create metadata in ZK. + } catch (ConflictException ignore) { } producer = client.newProducer(Schema.BYTES)