This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8add5db6c17 [fix][broker] Reject auto create partitioned topic when 
topic name contains ``-partition-`` (#14920)
8add5db6c17 is described below

commit 8add5db6c178422a215005dfc0751f28bc6124af
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Apr 8 15:12:24 2022 +0800

    [fix][broker] Reject auto create partitioned topic when topic name contains 
``-partition-`` (#14920)
---
 .../pulsar/broker/PulsarServerException.java       |  6 ++++
 .../apache/pulsar/broker/admin/AdminResource.java  |  4 +++
 .../pulsar/broker/service/BrokerService.java       |  5 +++
 .../service/persistent/PersistentTopicTest.java    | 41 ++++++++++++++++++++--
 4 files changed, 54 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
index 1fd1d077021..40dd53b3baf 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java
@@ -44,4 +44,10 @@ public class PulsarServerException extends IOException {
             super(t);
         }
     }
+    public static class InvalidTopicNameException extends 
PulsarServerException {
+
+        public InvalidTopicNameException(String message) {
+            super(message);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index fe5d1056e18..b00226e3a29 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -38,6 +38,7 @@ import javax.ws.rs.core.Response.Status;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -504,6 +505,9 @@ public abstract class AdminResource extends 
PulsarWebResource {
             if (e.getCause() instanceof RestException) {
                 throw (RestException) e.getCause();
             }
+            if (e.getCause() instanceof 
PulsarServerException.InvalidTopicNameException) {
+                throw new RestException(Status.PRECONDITION_FAILED, 
e.getCause().getMessage());
+            }
             throw new RestException(e);
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6f1ccc3b1ac..0e66d3246ec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2647,6 +2647,11 @@ public class BrokerService implements Closeable {
 
     @SuppressWarnings("deprecation")
     private CompletableFuture<PartitionedTopicMetadata> 
createDefaultPartitionedTopicAsync(TopicName topicName) {
+        if 
(topicName.getLocalName().contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) {
+            return FutureUtil.failedFuture(new PulsarServerException.
+                    InvalidTopicNameException(
+                            String.format("Invalid topic name: %s , should not 
contain -partition-", topicName)));
+        }
         final int defaultNumPartitions = 
pulsar.getBrokerService().getDefaultNumPartitions(topicName);
         final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
         checkArgument(defaultNumPartitions > 0,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index b9069417704..6d3ec637785 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -29,14 +29,14 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
-
 import java.lang.reflect.Field;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-
 import com.google.common.collect.Sets;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -44,6 +44,8 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -51,6 +53,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
+import org.junit.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -271,4 +274,38 @@ public class PersistentTopicTest extends BrokerTestBase {
             producer.close();
         }
     }
+
+    @Test
+    public void testAutoCreatePartitionedTopicThatNameIncludePartition() 
throws Exception {
+        final String topicName = 
"persistent://prop/autoNs/failedcreate-partition-abcde";
+        final String ns = "prop/autoNs";
+        admin.namespaces().createNamespace(ns);
+        pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+        try {
+            @Cleanup
+            Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName)
+                    .create();
+            Assert.fail("unexpected operation");
+        } catch (PulsarClientException ex) {
+            Assert.assertTrue(ex.getMessage()
+                    .contains("Invalid topic name"));
+        }
+        Assert.assertEquals(admin.topics().getList(ns).size(), 0);
+        URI tcpLookupUrl = new URI(pulsar.getBrokerServiceUrl());
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(tcpLookupUrl.toString())
+                .build();
+        try {
+            @Cleanup
+            Producer<byte[]> producer = client.newProducer()
+                    .topic(topicName)
+                    .create();
+            Assert.fail("unexpected operation");
+        } catch (PulsarClientException ex) {
+            Assert.assertTrue(ex.getMessage()
+                    .contains("Invalid topic name"));
+        }
+        Assert.assertEquals(admin.topics().getList(ns).size(), 0);
+        
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
+    }
 }

Reply via email to