This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8c86b3d6552d03f660056e9447e3ba9ce1d77ebb Author: Shen Liu <[email protected]> AuthorDate: Tue Jan 18 12:10:07 2022 +0800 [Issuse 13640][broker] Fix non persistent topic subscription error. (#13685) Fixes #13640 If pulsar broker started with `allowAutoSubscriptionCreation=false`, there are no way to subscribe non persistent topic. Add `isPersistent` check in `ServerCnx`. (cherry picked from commit 3dbe418b70e35d710fef7c8da6689e5ef6b8574a) --- .../apache/pulsar/broker/admin/v1/PersistentTopics.java | 2 +- .../apache/pulsar/broker/admin/v2/PersistentTopics.java | 2 +- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 3 ++- .../main/java/org/apache/pulsar/broker/service/Topic.java | 2 ++ .../broker/service/nonpersistent/NonPersistentTopic.java | 5 +++++ .../pulsar/broker/service/persistent/PersistentTopic.java | 5 +++++ .../service/BrokerServiceAutoSubscriptionCreationTest.java | 14 ++++++++++++++ 7 files changed, 30 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 112273feb..df4d553 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -605,7 +605,7 @@ public class PersistentTopics extends PersistentTopicsBase { try { validateTopicName(property, cluster, namespace, topic); if (!topicName.isPersistent()) { - throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic" + throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic " + "can only be done through client"); } internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index ca7e6cc..0fd307f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1326,7 +1326,7 @@ public class PersistentTopics extends PersistentTopicsBase { try { validateTopicName(tenant, namespace, topic); if (!topicName.isPersistent()) { - throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic" + throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic " + "can only be done through client"); } internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 824a2e8..04d3ab5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1014,7 +1014,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { boolean rejectSubscriptionIfDoesNotExist = isDurable && !service.isAllowAutoSubscriptionCreation(topicName.toString()) - && !topic.getSubscriptions().containsKey(subscriptionName); + && !topic.getSubscriptions().containsKey(subscriptionName) + && topic.isPersistent(); if (rejectSubscriptionIfDoesNotExist) { return FutureUtil diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 2b92560..e9638f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -244,6 +244,8 @@ public interface Topic { return false; } + boolean isPersistent(); + /* ------ Transaction related ------ */ /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 0180f5d..15b3d51 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -1075,4 +1075,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { protected boolean isTerminated() { return false; } + + @Override + public boolean isPersistent() { + return false; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 11ec5f2..eee2706 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2927,6 +2927,11 @@ public class PersistentTopic extends AbstractTopic return false; } + @Override + public boolean isPersistent() { + return true; + } + private synchronized void fence() { isFenced = true; ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java index c635968..81f9521 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java @@ -131,4 +131,18 @@ public class BrokerServiceAutoSubscriptionCreationTest extends BrokerTestBase { assertFalse(admin.topics().getSubscriptions(topicName.toString()).contains(subscriptionName)); } + @Test + public void testNonPersistentTopicSubscriptionCreationWithAutoCreationDisable() throws Exception { + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); + + final String topicName = "non-persistent://prop/ns-abc/test-subtopic-" + testId.getAndIncrement(); + final String subscriptionName = "test-subtopic-sub"; + + admin.topics().createNonPartitionedTopic(topicName); + + // Subscribe operation should be successful + pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); + assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName)); + } + }
