This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8add5db6c17 [fix][broker] Reject auto create partitioned topic when
topic name contains ``-partition-`` (#14920)
8add5db6c17 is described below
commit 8add5db6c178422a215005dfc0751f28bc6124af
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Apr 8 15:12:24 2022 +0800
[fix][broker] Reject auto create partitioned topic when topic name contains
``-partition-`` (#14920)
---
.../pulsar/broker/PulsarServerException.java | 6 ++++
.../apache/pulsar/broker/admin/AdminResource.java | 4 +++
.../pulsar/broker/service/BrokerService.java | 5 +++
.../service/persistent/PersistentTopicTest.java | 41 ++++++++++++++++++++--
4 files changed, 54 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
index 1fd1d077021..40dd53b3baf 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
@@ -44,4 +44,10 @@ public class PulsarServerException extends IOException {
super(t);
}
}
+ public static class InvalidTopicNameException extends
PulsarServerException {
+
+ public InvalidTopicNameException(String message) {
+ super(message);
+ }
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index fe5d1056e18..b00226e3a29 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -38,6 +38,7 @@ import javax.ws.rs.core.Response.Status;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -504,6 +505,9 @@ public abstract class AdminResource extends
PulsarWebResource {
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
}
+ if (e.getCause() instanceof
PulsarServerException.InvalidTopicNameException) {
+ throw new RestException(Status.PRECONDITION_FAILED,
e.getCause().getMessage());
+ }
throw new RestException(e);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6f1ccc3b1ac..0e66d3246ec 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2647,6 +2647,11 @@ public class BrokerService implements Closeable {
@SuppressWarnings("deprecation")
private CompletableFuture<PartitionedTopicMetadata>
createDefaultPartitionedTopicAsync(TopicName topicName) {
+ if
(topicName.getLocalName().contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) {
+ return FutureUtil.failedFuture(new PulsarServerException.
+ InvalidTopicNameException(
+ String.format("Invalid topic name: %s , should not
contain -partition-", topicName)));
+ }
final int defaultNumPartitions =
pulsar.getBrokerService().getDefaultNumPartitions(topicName);
final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
checkArgument(defaultNumPartitions > 0,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index b9069417704..6d3ec637785 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -29,14 +29,14 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
-
import java.lang.reflect.Field;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-
import com.google.common.collect.Sets;
+import lombok.Cleanup;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -44,6 +44,8 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -51,6 +53,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
+import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -271,4 +274,38 @@ public class PersistentTopicTest extends BrokerTestBase {
producer.close();
}
}
+
+ @Test
+ public void testAutoCreatePartitionedTopicThatNameIncludePartition()
throws Exception {
+ final String topicName =
"persistent://prop/autoNs/failedcreate-partition-abcde";
+ final String ns = "prop/autoNs";
+ admin.namespaces().createNamespace(ns);
+ pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+ try {
+ @Cleanup
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName)
+ .create();
+ Assert.fail("unexpected operation");
+ } catch (PulsarClientException ex) {
+ Assert.assertTrue(ex.getMessage()
+ .contains("Invalid topic name"));
+ }
+ Assert.assertEquals(admin.topics().getList(ns).size(), 0);
+ URI tcpLookupUrl = new URI(pulsar.getBrokerServiceUrl());
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(tcpLookupUrl.toString())
+ .build();
+ try {
+ @Cleanup
+ Producer<byte[]> producer = client.newProducer()
+ .topic(topicName)
+ .create();
+ Assert.fail("unexpected operation");
+ } catch (PulsarClientException ex) {
+ Assert.assertTrue(ex.getMessage()
+ .contains("Invalid topic name"));
+ }
+ Assert.assertEquals(admin.topics().getList(ns).size(), 0);
+
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
+ }
}