sijie closed pull request #2342: Issue 1067: Problems with Partitioned Topics
which name contains -partition-N
URL: https://github.com/apache/incubator-pulsar/pull/2342
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index be1b7dde12..de7dd0b7ed 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -254,6 +254,15 @@ protected void validateTopicName(String property, String
namespace, String encod
this.topicName = TopicName.get(domain(), namespaceName, topic);
}
+ protected void validatePartitionedTopicName(String tenant, String
namespace, String encodedTopic) {
+ // first, it has to be a validate topic name
+ validateTopicName(tenant, namespace, encodedTopic);
+ // second, "-partition-" is not allowed
+ if (encodedTopic.contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) {
+ throw new RestException(Status.PRECONDITION_FAILED, "Partitioned
Topic Name should not contain '-partition-'");
+ }
+ }
+
@Deprecated
protected void validateTopicName(String property, String cluster, String
namespace, String encodedTopic) {
String topic = Codec.decode(encodedTopic);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 6a246b8b4e..771a057e16 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -124,12 +124,15 @@ public void revokePermissionsOnTopic(@PathParam("tenant")
String tenant,
@PUT
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Create a partitioned topic.", notes = "It needs to
be called before creating a producer on a partitioned topic.")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
- @ApiResponse(code = 409, message = "Partitioned topic already
exist") })
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 409, message = "Partitioned topic already exist"),
+ @ApiResponse(code = 412, message = "Partitioned topic name is invalid")
+ })
public void createPartitionedTopic(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic, int
numPartitions,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateTopicName(tenant, namespace, encodedTopic);
+ validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalCreatePartitionedTopic(numPartitions, authoritative);
}
@@ -151,18 +154,25 @@ public void createPartitionedTopic(@PathParam("tenant")
String tenant, @PathPara
@POST
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Increment partitons of an existing partitioned
topic.", notes = "It only increments partitions of existing non-global
partitioned-topic")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
- @ApiResponse(code = 409, message = "Partitioned topic does not
exist") })
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 409, message = "Partitioned topic does not exist"),
+ @ApiResponse(code = 412, message = "Partitioned topic name is invalid")
+ })
public void updatePartitionedTopic(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic, int
numPartitions) {
- validateTopicName(tenant, namespace, encodedTopic);
+ validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalUpdatePartitionedTopic(numPartitions);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Get partitioned topic metadata.")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission") })
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 409, message = "Partitioned topic does not exist"),
+ @ApiResponse(code = 412, message = "Partitioned topic name is invalid")
+ })
public PartitionedTopicMetadata
getPartitionedMetadata(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("topic")
@Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
@@ -173,13 +183,16 @@ public PartitionedTopicMetadata
getPartitionedMetadata(@PathParam("tenant") Stri
@DELETE
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Delete a partitioned topic.", notes = "It will also
delete all the partitions of the topic if it exists.")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
- @ApiResponse(code = 404, message = "Partitioned topic does not
exist") })
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Partitioned topic does not exist"),
+ @ApiResponse(code = 412, message = "Partitioned topic name is invalid")
+ })
public void deletePartitionedTopic(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateTopicName(tenant, namespace, encodedTopic);
+ validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalDeletePartitionedTopic(authoritative, force);
}
@@ -259,12 +272,15 @@ public void getManagedLedgerInfo(@PathParam("tenant")
String tenant, @PathParam(
@GET
@Path("{tenant}/{namespace}/{topic}/partitioned-stats")
@ApiOperation(value = "Get the stats for the partitioned topic.")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
- @ApiResponse(code = 404, message = "Topic does not exist") })
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 412, message = "Partitioned topic name is invalid")
+ })
public PartitionedTopicStats getPartitionedStats(@PathParam("tenant")
String tenant,
@PathParam("namespace") String namespace, @PathParam("topic")
@Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateTopicName(tenant, namespace, encodedTopic);
+ validatePartitionedTopicName(tenant, namespace, encodedTopic);
return internalGetPartitionedStats(authoritative);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
new file mode 100644
index 0000000000..7ad60c2b45
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import javax.ws.rs.core.Response.Status;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.util.Codec;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link AdminResource}.
+ */
+public class AdminResourceTest {
+
+ private static AdminResource mockResource() {
+ return new AdminResource() {
+
+ @Override
+ protected String domain() {
+ return "persistent";
+ }
+ };
+ }
+
+ @Test
+ public void testValidatePartitionedTopicNameSuccess() {
+ String tenant = "test-tenant";
+ String namespace = "test-namespace";
+ String topic = Codec.encode("test-topic");
+
+ AdminResource resource = mockResource();
+ resource.validatePartitionedTopicName(tenant, namespace, topic);
+ }
+
+ @Test
+ public void testValidatePartitionedTopicNameInvalid() {
+ String tenant = "test-tenant";
+ String namespace = "test-namespace";
+ String topic = Codec.encode("test-topic-partition-0");
+
+ AdminResource resource = mockResource();
+ try {
+ resource.validatePartitionedTopicName(tenant, namespace, topic);
+ fail("Should fail validation on invalid partitioned topic");
+ } catch (RestException re) {
+ assertEquals(Status.PRECONDITION_FAILED.getStatusCode(),
re.getResponse().getStatus());
+ }
+ }
+
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index ef455986c3..3d43271974 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -44,7 +44,7 @@
public static final String PUBLIC_TENANT = "public";
public static final String DEFAULT_NAMESPACE = "default";
- private static final String PARTITIONED_TOPIC_SUFFIX = "-partition-";
+ public static final String PARTITIONED_TOPIC_SUFFIX = "-partition-";
private final String completeTopicName;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services