This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8c86b3d6552d03f660056e9447e3ba9ce1d77ebb
Author: Shen Liu <[email protected]>
AuthorDate: Tue Jan 18 12:10:07 2022 +0800

    [Issuse 13640][broker] Fix non persistent topic subscription error. (#13685)
    
    Fixes #13640
    
    If pulsar broker started with `allowAutoSubscriptionCreation=false`, there 
are no way to subscribe non persistent topic.
    
    Add `isPersistent` check in `ServerCnx`.
    
    (cherry picked from commit 3dbe418b70e35d710fef7c8da6689e5ef6b8574a)
---
 .../apache/pulsar/broker/admin/v1/PersistentTopics.java    |  2 +-
 .../apache/pulsar/broker/admin/v2/PersistentTopics.java    |  2 +-
 .../java/org/apache/pulsar/broker/service/ServerCnx.java   |  3 ++-
 .../main/java/org/apache/pulsar/broker/service/Topic.java  |  2 ++
 .../broker/service/nonpersistent/NonPersistentTopic.java   |  5 +++++
 .../pulsar/broker/service/persistent/PersistentTopic.java  |  5 +++++
 .../service/BrokerServiceAutoSubscriptionCreationTest.java | 14 ++++++++++++++
 7 files changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 112273feb..df4d553 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -605,7 +605,7 @@ public class PersistentTopics extends PersistentTopicsBase {
         try {
             validateTopicName(property, cluster, namespace, topic);
             if (!topicName.isPersistent()) {
-                throw new RestException(Response.Status.BAD_REQUEST, "Create 
subscription on non-persistent topic"
+                throw new RestException(Response.Status.BAD_REQUEST, "Create 
subscription on non-persistent topic "
                         + "can only be done through client");
             }
             internalCreateSubscription(asyncResponse, decode(encodedSubName), 
messageId, authoritative, replicated);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index ca7e6cc..0fd307f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1326,7 +1326,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
         try {
             validateTopicName(tenant, namespace, topic);
             if (!topicName.isPersistent()) {
-                throw new RestException(Response.Status.BAD_REQUEST, "Create 
subscription on non-persistent topic"
+                throw new RestException(Response.Status.BAD_REQUEST, "Create 
subscription on non-persistent topic "
                         + "can only be done through client");
             }
             internalCreateSubscription(asyncResponse, decode(encodedSubName), 
messageId, authoritative, replicated);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 824a2e8..04d3ab5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1014,7 +1014,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                                     boolean rejectSubscriptionIfDoesNotExist = 
isDurable
                                         && 
!service.isAllowAutoSubscriptionCreation(topicName.toString())
-                                        && 
!topic.getSubscriptions().containsKey(subscriptionName);
+                                        && 
!topic.getSubscriptions().containsKey(subscriptionName)
+                                        && topic.isPersistent();
 
                                     if (rejectSubscriptionIfDoesNotExist) {
                                         return FutureUtil
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 2b92560..e9638f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -244,6 +244,8 @@ public interface Topic {
         return false;
     }
 
+    boolean isPersistent();
+
     /* ------ Transaction related ------ */
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 0180f5d..15b3d51 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -1075,4 +1075,9 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
     protected boolean isTerminated() {
         return false;
     }
+
+    @Override
+    public boolean isPersistent() {
+        return false;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 11ec5f2..eee2706 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2927,6 +2927,11 @@ public class PersistentTopic extends AbstractTopic
         return false;
     }
 
+    @Override
+    public boolean isPersistent() {
+        return true;
+    }
+
     private synchronized void fence() {
         isFenced = true;
         ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
index c635968..81f9521 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
@@ -131,4 +131,18 @@ public class BrokerServiceAutoSubscriptionCreationTest 
extends BrokerTestBase {
         
assertFalse(admin.topics().getSubscriptions(topicName.toString()).contains(subscriptionName));
     }
 
+    @Test
+    public void 
testNonPersistentTopicSubscriptionCreationWithAutoCreationDisable() throws 
Exception {
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+
+        final String topicName = "non-persistent://prop/ns-abc/test-subtopic-" 
+ testId.getAndIncrement();
+        final String subscriptionName = "test-subtopic-sub";
+
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        // Subscribe operation should be successful
+        
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+        
assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
+    }
+
 }

Reply via email to