This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new a8068c8  Not allow sub auto create by admin when disable topic auto 
create (#6685)
a8068c8 is described below

commit a8068c80e1dd4690b831301fe8e55be06c96d030
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Apr 8 22:20:37 2020 +0800

    Not allow sub auto create by admin when disable topic auto create (#6685)
    
    ### Motivation
    
    Not allow sub auto create by admin when disable topic auto create
    
    ### Modifications
    
    change admin code to not allow sub auto create by admin when disable topic 
auto create
    add tests
    
    ### Verifying this change
    ut passed
    
    * fix sub auto created by admin
    * fix test error: create sub partition when update it
    * fix flaky test
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  2 +-
 .../broker/admin/impl/PersistentTopicsBase.java    | 12 ++++--
 .../BrokerServiceAutoTopicCreationTest.java        | 44 ++++++++++++++++++++++
 .../pulsar/client/api/PartitionCreationTest.java   |  2 +-
 4 files changed, 55 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 3805d53..42e7a7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -282,7 +282,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
                 } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
                     log.info("[{}] Topic partition {} is exists, doing 
nothing.", clientAppId(),
                         topicName.getPartition(partition));
-                    
result.completeExceptionally(KeeperException.create(KeeperException.Code.NODEEXISTS));
+                    result.complete(null);
                 } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
                     log.warn("[{}] Fail to create topic partition {} with 
concurrent modification, retry now.",
                             clientAppId(), topicName.getPartition(partition));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7186330..a1e22ad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.admin.ZkAdminPaths;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -522,11 +523,12 @@ public class PersistentTopicsBase extends AdminResource {
             }
             return;
         }
-        
+
         if (numPartitions <= 0) {
             throw new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be more than 0");
         }
         try {
+            tryCreatePartitionsAsync(numPartitions).get();
             updatePartitionedTopic(topicName, numPartitions).get();
         } catch (Exception e) {
             if (e.getCause() instanceof RestException) {
@@ -1501,7 +1503,8 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isPartitioned()) {
             internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, 
subscriptionName, targetMessageId, authoritative, replicated);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).thenAccept(partitionMetadata -> {
+            boolean allowAutoTopicCreation = 
pulsar().getConfiguration().isAllowAutoTopicCreation();
+            getPartitionedTopicMetadataAsync(topicName, authoritative, 
allowAutoTopicCreation).thenAccept(partitionMetadata -> {
                 final int numPartitions = partitionMetadata.partitions;
                 if (numPartitions > 0) {
                     final CompletableFuture<Void> future = new 
CompletableFuture<>();
@@ -1578,7 +1581,10 @@ public class PersistentTopicsBase extends AdminResource {
               MessageIdImpl targetMessageId, boolean authoritative, boolean 
replicated) {
         try {
             validateAdminAccessForSubscriber(subscriptionName, authoritative);
-            PersistentTopic topic = (PersistentTopic) 
getOrCreateTopic(topicName);
+
+            boolean isAllowAutoTopicCreation = 
pulsar().getConfiguration().isAllowAutoTopicCreation();
+            PersistentTopic topic = (PersistentTopic) 
pulsar().getBrokerService()
+                    .getTopic(topicName.toString(), 
isAllowAutoTopicCreation).thenApply(Optional::get).join();
             if (topic.getSubscriptions().containsKey(subscriptionName)) {
                 asyncResponse.resume(new RestException(Status.CONFLICT, 
"Subscription already exists for topic"));
                 return;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 929f9c2..6f537da 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -120,4 +120,48 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
         assertEquals(partitions, 0);
         
assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
     }
+
+
+    @Test
+    public void testNotAllowSubscriptionTopicCreation() throws Exception{
+        pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+        String topicName = "persistent://prop/ns-abc/non-partitioned-topic" + 
System.currentTimeMillis();
+        String subscriptionName = "non-partitioned-topic-sub";
+
+        try {
+            admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+            fail("should fail to create subscription once not 
allowAutoTopicCreation");
+        } catch (Exception e) {
+            // expected
+        }
+
+        try {
+            admin.topics().createSubscription(topicName + "-partition-0",
+                    subscriptionName, MessageId.earliest);
+            fail("should fail to create subscription once not 
allowAutoTopicCreation");
+        } catch (Exception e) {
+            // expected
+        }
+
+        
assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+        
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+
+        try {
+            admin.topics().createNonPartitionedTopic(topicName);
+            admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+        } catch (Exception e) {
+            // expected
+            fail("should success to create subscription once topic created");
+        }
+
+        try {
+            String partitionTopic = 
"persistent://prop/ns-abc/partitioned-topic" + System.currentTimeMillis();
+            admin.topics().createPartitionedTopic(partitionTopic, 1);
+            admin.topics().createSubscription(partitionTopic + "-partition-0", 
subscriptionName, MessageId.earliest);
+        } catch (Exception e) {
+            // expected
+            fail("should success to create subscription once topic created");
+        }
+
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
index 2647bbb..5e1ad1d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
@@ -104,7 +104,7 @@ public class PartitionCreationTest extends 
ProducerConsumerBase {
     }
 
     @Test
-    public void 
testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation() 
throws PulsarClientException, PulsarAdminException {
+    public void 
testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation() 
throws Exception {
         conf.setAllowAutoTopicCreation(false);
         final String topic = 
"testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation";
         admin.topics().createPartitionedTopic(topic, 3);

Reply via email to