This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 16aef57eefd15b7bf9036fbca9c3ff821b63ef02
Author: Shen Liu <[email protected]>
AuthorDate: Tue Jan 18 12:10:07 2022 +0800

    [Issuse 13640][broker] Fix non persistent topic subscription error. (#13685)
    
    Fixes #13640
    
    ### Motivation
    
    If pulsar broker started with `allowAutoSubscriptionCreation=false`, there 
are no way to subscribe non persistent topic.
    
    ### Modifications
    
    Add `isPersistent` check in `ServerCnx`.
    
    (cherry picked from commit 3dbe418b70e35d710fef7c8da6689e5ef6b8574a)
---
 .../apache/pulsar/broker/admin/v1/PersistentTopics.java    |  2 +-
 .../apache/pulsar/broker/admin/v2/PersistentTopics.java    |  2 +-
 .../java/org/apache/pulsar/broker/service/ServerCnx.java   |  3 ++-
 .../main/java/org/apache/pulsar/broker/service/Topic.java  |  2 ++
 .../broker/service/nonpersistent/NonPersistentTopic.java   |  5 +++++
 .../pulsar/broker/service/persistent/PersistentTopic.java  |  5 +++++
 .../service/BrokerServiceAutoSubscriptionCreationTest.java | 14 ++++++++++++++
 7 files changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index c0c4b48..fcb2ba8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -638,7 +638,7 @@ public class PersistentTopics extends PersistentTopicsBase {
         try {
             validateTopicName(property, cluster, namespace, topic);
             if (!topicName.isPersistent()) {
-                throw new RestException(Response.Status.BAD_REQUEST, "Create 
subscription on non-persistent topic"
+                throw new RestException(Response.Status.BAD_REQUEST, "Create 
subscription on non-persistent topic "
                         + "can only be done through client");
             }
             internalCreateSubscription(asyncResponse, decode(encodedSubName), 
messageId, authoritative, replicated);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 977d54a..3b2ba8d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1329,7 +1329,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
         try {
             validateTopicName(tenant, namespace, topic);
             if (!topicName.isPersistent()) {
-                throw new RestException(Response.Status.BAD_REQUEST, "Create 
subscription on non-persistent topic"
+                throw new RestException(Response.Status.BAD_REQUEST, "Create 
subscription on non-persistent topic "
                         + "can only be done through client");
             }
             internalCreateSubscription(asyncResponse, decode(encodedSubName), 
messageId, authoritative, replicated);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 583ad3c..9f317be 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1008,7 +1008,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                             boolean rejectSubscriptionIfDoesNotExist = 
isDurable
                                 && 
!service.isAllowAutoSubscriptionCreation(topicName.toString())
-                                && 
!topic.getSubscriptions().containsKey(subscriptionName);
+                                && 
!topic.getSubscriptions().containsKey(subscriptionName)
+                                && topic.isPersistent();
 
                             if (rejectSubscriptionIfDoesNotExist) {
                                 return FutureUtil
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 4e5c698..0e7589a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -258,6 +258,8 @@ public interface Topic {
         return false;
     }
 
+    boolean isPersistent();
+
     /* ------ Transaction related ------ */
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 43245b9..808b403 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -1076,4 +1076,9 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
     protected boolean isTerminated() {
         return false;
     }
+
+    @Override
+    public boolean isPersistent() {
+        return false;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d13e65e..3c08bd9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2929,6 +2929,11 @@ public class PersistentTopic extends AbstractTopic
         return false;
     }
 
+    @Override
+    public boolean isPersistent() {
+        return true;
+    }
+
     private synchronized void fence() {
         isFenced = true;
         ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
index d0eb3bf..dc4b3d9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
@@ -138,4 +138,18 @@ public class BrokerServiceAutoSubscriptionCreationTest 
extends BrokerTestBase {
         
assertFalse(admin.topics().getSubscriptions(topicName.toString()).contains(subscriptionName));
     }
 
+    @Test
+    public void 
testNonPersistentTopicSubscriptionCreationWithAutoCreationDisable() throws 
Exception {
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+
+        final String topicName = "non-persistent://prop/ns-abc/test-subtopic-" 
+ testId.getAndIncrement();
+        final String subscriptionName = "test-subtopic-sub";
+
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        // Subscribe operation should be successful
+        
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+        
assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
+    }
+
 }

Reply via email to