This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new ee3ad34ed78 [improve][broker] Save createIfMissing in 
TopicLoadingContext (#19993)
ee3ad34ed78 is described below

commit ee3ad34ed78f2c63ee38b9582773081583c52d1d
Author: lifepuzzlefun <[email protected]>
AuthorDate: Fri Jun 9 17:22:26 2023 +0800

    [improve][broker] Save createIfMissing in TopicLoadingContext (#19993)
---
 .../pulsar/broker/service/BrokerService.java       |  9 +++-
 .../pulsar/broker/service/BrokerServiceTest.java   | 63 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ad80f4b2068..75cf1b02eb4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1403,7 +1403,8 @@ public class BrokerService implements Closeable {
                             return null;
                         });
                     } else {
-                        pendingTopicLoadingQueue.add(new 
TopicLoadingContext(topic, topicFuture, properties));
+                        pendingTopicLoadingQueue.add(new 
TopicLoadingContext(topic,
+                                createIfMissing, topicFuture, properties));
                         if (log.isDebugEnabled()) {
                             log.debug("topic-loading for {} added into pending 
queue", topic);
                         }
@@ -2678,7 +2679,10 @@ public class BrokerService implements Closeable {
             CompletableFuture<Optional<Topic>> pendingFuture = 
pendingTopic.getTopicFuture();
             final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
             final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
-            checkOwnershipAndCreatePersistentTopic(topic, true, pendingFuture, 
pendingTopic.getProperties());
+            checkOwnershipAndCreatePersistentTopic(topic,
+                    pendingTopic.isCreateIfMissing(),
+                    pendingFuture,
+                    pendingTopic.getProperties());
             pendingFuture.handle((persistentTopic, ex) -> {
                 // release permit and process next pending topic
                 if (acquiredPermit) {
@@ -3162,6 +3166,7 @@ public class BrokerService implements Closeable {
     @Getter
     private static class TopicLoadingContext {
         private final String topic;
+        private final boolean createIfMissing;
         private final CompletableFuture<Optional<Topic>> topicFuture;
         private final Map<String, String> properties;
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 93cf004e017..dbab6996c87 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -41,6 +41,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.lang.reflect.Array;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -1108,6 +1109,68 @@ public class BrokerServiceTest extends BrokerTestBase {
         }
     }
 
+    @Test
+    public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() 
throws Exception {
+        boolean needDeleteTopic = false;
+        final String namespace = "prop/concurrentLoad";
+        try {
+            // set up broker disable auto create and set concurrent load to 1 
qps.
+            cleanup();
+            conf.setMaxConcurrentTopicLoadRequest(1);
+            conf.setAllowAutoTopicCreation(false);
+            setup();
+
+            try {
+                admin.namespaces().createNamespace(namespace);
+            } catch (PulsarAdminException.ConflictException e) {
+                // Ok.. (if test fails intermittently and namespace is already 
created)
+            }
+
+            // create 3 topic
+            String topicName = "persistent://" + namespace + "/my-topic";
+
+            for (int i = 0; i < 3; i++) {
+                admin.topics().createNonPartitionedTopic(topicName + "_" + i);
+            }
+
+            needDeleteTopic = true;
+
+            // try to load 10 topic
+            ArrayList<CompletableFuture<Optional<Topic>>> loadFutures = new 
ArrayList<>();
+            for (int i = 0; i < 10; i++) {
+                // try to create topic which should fail as bundle is disable
+                CompletableFuture<Optional<Topic>> futureResult = 
pulsar.getBrokerService()
+                        .loadOrCreatePersistentTopic(topicName + "_" + i, 
false, null);
+                loadFutures.add(futureResult);
+            }
+
+            CompletableFuture<?>[] o = (CompletableFuture<?>[]) 
Array.newInstance(CompletableFuture.class, 10);
+            CompletableFuture<?>[] completableFutures = loadFutures.toArray(o);
+            CompletableFuture.allOf(completableFutures).get();
+
+            // check topic load CompletableFuture. only first three topic 
should be success.
+            for (int i = 0; i < 10; i++) {
+                CompletableFuture<Optional<Topic>> load = loadFutures.get(i);
+                if (i < 3) {
+                    Assert.assertTrue(load.isDone());
+                    Assert.assertFalse(load.isCompletedExceptionally());
+                } else {
+                    // check topic should not be created if disable 
autoCreateTopic.
+                    Assert.assertTrue(load.isDone());
+                    Assert.assertTrue(load.get().isEmpty());
+                }
+            }
+        } finally {
+            if (needDeleteTopic) {
+                String topicName = "persistent://" + namespace + "/my-topic";
+
+                for (int i = 0; i < 3; i++) {
+                    admin.topics().delete(topicName + "_" + i);
+                }
+            }
+        }
+    }
+
     /**
      * Verifies brokerService should not have deadlock and successfully remove 
topic from topicMap on topic-failure and
      * it should not introduce deadlock while performing it.

Reply via email to