This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 16aef57eefd15b7bf9036fbca9c3ff821b63ef02 Author: Shen Liu <[email protected]> AuthorDate: Tue Jan 18 12:10:07 2022 +0800 [Issuse 13640][broker] Fix non persistent topic subscription error. (#13685) Fixes #13640 ### Motivation If pulsar broker started with `allowAutoSubscriptionCreation=false`, there are no way to subscribe non persistent topic. ### Modifications Add `isPersistent` check in `ServerCnx`. (cherry picked from commit 3dbe418b70e35d710fef7c8da6689e5ef6b8574a) --- .../apache/pulsar/broker/admin/v1/PersistentTopics.java | 2 +- .../apache/pulsar/broker/admin/v2/PersistentTopics.java | 2 +- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 3 ++- .../main/java/org/apache/pulsar/broker/service/Topic.java | 2 ++ .../broker/service/nonpersistent/NonPersistentTopic.java | 5 +++++ .../pulsar/broker/service/persistent/PersistentTopic.java | 5 +++++ .../service/BrokerServiceAutoSubscriptionCreationTest.java | 14 ++++++++++++++ 7 files changed, 30 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index c0c4b48..fcb2ba8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -638,7 +638,7 @@ public class PersistentTopics extends PersistentTopicsBase { try { validateTopicName(property, cluster, namespace, topic); if (!topicName.isPersistent()) { - throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic" + throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic " + "can only be done through client"); } internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 977d54a..3b2ba8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1329,7 +1329,7 @@ public class PersistentTopics extends PersistentTopicsBase { try { validateTopicName(tenant, namespace, topic); if (!topicName.isPersistent()) { - throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic" + throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic " + "can only be done through client"); } internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 583ad3c..9f317be 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1008,7 +1008,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { boolean rejectSubscriptionIfDoesNotExist = isDurable && !service.isAllowAutoSubscriptionCreation(topicName.toString()) - && !topic.getSubscriptions().containsKey(subscriptionName); + && !topic.getSubscriptions().containsKey(subscriptionName) + && topic.isPersistent(); if (rejectSubscriptionIfDoesNotExist) { return FutureUtil diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 4e5c698..0e7589a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -258,6 +258,8 @@ public interface Topic { return false; } + boolean isPersistent(); + /* ------ Transaction related ------ */ /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 43245b9..808b403 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -1076,4 +1076,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { protected boolean isTerminated() { return false; } + + @Override + public boolean isPersistent() { + return false; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d13e65e..3c08bd9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2929,6 +2929,11 @@ public class PersistentTopic extends AbstractTopic return false; } + @Override + public boolean isPersistent() { + return true; + } + private synchronized void fence() { isFenced = true; ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java index d0eb3bf..dc4b3d9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java @@ -138,4 +138,18 @@ public class BrokerServiceAutoSubscriptionCreationTest extends BrokerTestBase { assertFalse(admin.topics().getSubscriptions(topicName.toString()).contains(subscriptionName)); } + @Test + public void testNonPersistentTopicSubscriptionCreationWithAutoCreationDisable() throws Exception { + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); + + final String topicName = "non-persistent://prop/ns-abc/test-subtopic-" + testId.getAndIncrement(); + final String subscriptionName = "test-subtopic-sub"; + + admin.topics().createNonPartitionedTopic(topicName); + + // Subscribe operation should be successful + pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); + assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName)); + } + }
