This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 05488a0 Issue 1067: Problems with Partitioned Topics which name contains -partition-N (#2342) 05488a0 is described below commit 05488a0be6e27292344a093f694d2a5f48886601 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Fri Aug 10 19:40:44 2018 -0700 Issue 1067: Problems with Partitioned Topics which name contains -partition-N (#2342) * Issue 1067: Problems with Partitioned Topics which name contains -partition-N ### Motivation Fixes #1067. Someone accidentally created a partitioned topic in one of our cluster with a name which contains -partition-2. This raised all sorts of issues, it would seem that only one partition was created for this topic, but metadata exists saying that it has 10 partitions. ### Changes Disallow creating a partitioned topic contains '-partition-' in the name. * only validate partitioned topic name when create/delete/update partitioned topics * Changed to use a CONSTANT --- .../apache/pulsar/broker/admin/AdminResource.java | 9 +++ .../pulsar/broker/admin/v2/PersistentTopics.java | 42 +++++++++---- .../pulsar/broker/admin/AdminResourceTest.java | 69 ++++++++++++++++++++++ .../org/apache/pulsar/common/naming/TopicName.java | 2 +- 4 files changed, 108 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index be1b7dd..de7dd0b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -254,6 +254,15 @@ public abstract class AdminResource extends PulsarWebResource { this.topicName = TopicName.get(domain(), namespaceName, topic); } + protected void validatePartitionedTopicName(String tenant, String namespace, String encodedTopic) { + // first, it has to be a validate topic name + validateTopicName(tenant, namespace, encodedTopic); + // second, "-partition-" is not allowed + if (encodedTopic.contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) { + throw new RestException(Status.PRECONDITION_FAILED, "Partitioned Topic Name should not contain '-partition-'"); + } + } + @Deprecated protected void validateTopicName(String property, String cluster, String namespace, String encodedTopic) { String topic = Codec.decode(encodedTopic); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 6a246b8..771a057 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -124,12 +124,15 @@ public class PersistentTopics extends PersistentTopicsBase { @PUT @Path("/{tenant}/{namespace}/{topic}/partitions") @ApiOperation(value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.") - @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 409, message = "Partitioned topic already exist") }) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 409, message = "Partitioned topic already exist"), + @ApiResponse(code = 412, message = "Partitioned topic name is invalid") + }) public void createPartitionedTopic(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, int numPartitions, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateTopicName(tenant, namespace, encodedTopic); + validatePartitionedTopicName(tenant, namespace, encodedTopic); internalCreatePartitionedTopic(numPartitions, authoritative); } @@ -151,18 +154,25 @@ public class PersistentTopics extends PersistentTopicsBase { @POST @Path("/{tenant}/{namespace}/{topic}/partitions") @ApiOperation(value = "Increment partitons of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic") - @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 409, message = "Partitioned topic does not exist") }) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 409, message = "Partitioned topic does not exist"), + @ApiResponse(code = 412, message = "Partitioned topic name is invalid") + }) public void updatePartitionedTopic(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, int numPartitions) { - validateTopicName(tenant, namespace, encodedTopic); + validatePartitionedTopicName(tenant, namespace, encodedTopic); internalUpdatePartitionedTopic(numPartitions); } @GET @Path("/{tenant}/{namespace}/{topic}/partitions") @ApiOperation(value = "Get partitioned topic metadata.") - @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") }) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 409, message = "Partitioned topic does not exist"), + @ApiResponse(code = 412, message = "Partitioned topic name is invalid") + }) public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { @@ -173,13 +183,16 @@ public class PersistentTopics extends PersistentTopicsBase { @DELETE @Path("/{tenant}/{namespace}/{topic}/partitions") @ApiOperation(value = "Delete a partitioned topic.", notes = "It will also delete all the partitions of the topic if it exists.") - @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 404, message = "Partitioned topic does not exist") }) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Partitioned topic does not exist"), + @ApiResponse(code = 412, message = "Partitioned topic name is invalid") + }) public void deletePartitionedTopic(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("force") @DefaultValue("false") boolean force, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateTopicName(tenant, namespace, encodedTopic); + validatePartitionedTopicName(tenant, namespace, encodedTopic); internalDeletePartitionedTopic(authoritative, force); } @@ -259,12 +272,15 @@ public class PersistentTopics extends PersistentTopicsBase { @GET @Path("{tenant}/{namespace}/{topic}/partitioned-stats") @ApiOperation(value = "Get the stats for the partitioned topic.") - @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 404, message = "Topic does not exist") }) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 412, message = "Partitioned topic name is invalid") + }) public PartitionedTopicStats getPartitionedStats(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateTopicName(tenant, namespace, encodedTopic); + validatePartitionedTopicName(tenant, namespace, encodedTopic); return internalGetPartitionedStats(authoritative); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java new file mode 100644 index 0000000..7ad60c2 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +import javax.ws.rs.core.Response.Status; +import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.common.util.Codec; +import org.testng.annotations.Test; + +/** + * Unit test {@link AdminResource}. + */ +public class AdminResourceTest { + + private static AdminResource mockResource() { + return new AdminResource() { + + @Override + protected String domain() { + return "persistent"; + } + }; + } + + @Test + public void testValidatePartitionedTopicNameSuccess() { + String tenant = "test-tenant"; + String namespace = "test-namespace"; + String topic = Codec.encode("test-topic"); + + AdminResource resource = mockResource(); + resource.validatePartitionedTopicName(tenant, namespace, topic); + } + + @Test + public void testValidatePartitionedTopicNameInvalid() { + String tenant = "test-tenant"; + String namespace = "test-namespace"; + String topic = Codec.encode("test-topic-partition-0"); + + AdminResource resource = mockResource(); + try { + resource.validatePartitionedTopicName(tenant, namespace, topic); + fail("Should fail validation on invalid partitioned topic"); + } catch (RestException re) { + assertEquals(Status.PRECONDITION_FAILED.getStatusCode(), re.getResponse().getStatus()); + } + } + +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index ef45598..3d43271 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -44,7 +44,7 @@ public class TopicName implements ServiceUnitId { public static final String PUBLIC_TENANT = "public"; public static final String DEFAULT_NAMESPACE = "default"; - private static final String PARTITIONED_TOPIC_SUFFIX = "-partition-"; + public static final String PARTITIONED_TOPIC_SUFFIX = "-partition-"; private final String completeTopicName;