This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new ee3ad34ed78 [improve][broker] Save createIfMissing in
TopicLoadingContext (#19993)
ee3ad34ed78 is described below
commit ee3ad34ed78f2c63ee38b9582773081583c52d1d
Author: lifepuzzlefun <[email protected]>
AuthorDate: Fri Jun 9 17:22:26 2023 +0800
[improve][broker] Save createIfMissing in TopicLoadingContext (#19993)
---
.../pulsar/broker/service/BrokerService.java | 9 +++-
.../pulsar/broker/service/BrokerServiceTest.java | 63 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ad80f4b2068..75cf1b02eb4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1403,7 +1403,8 @@ public class BrokerService implements Closeable {
return null;
});
} else {
- pendingTopicLoadingQueue.add(new
TopicLoadingContext(topic, topicFuture, properties));
+ pendingTopicLoadingQueue.add(new
TopicLoadingContext(topic,
+ createIfMissing, topicFuture, properties));
if (log.isDebugEnabled()) {
log.debug("topic-loading for {} added into pending
queue", topic);
}
@@ -2678,7 +2679,10 @@ public class BrokerService implements Closeable {
CompletableFuture<Optional<Topic>> pendingFuture =
pendingTopic.getTopicFuture();
final Semaphore topicLoadSemaphore =
topicLoadRequestSemaphore.get();
final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
- checkOwnershipAndCreatePersistentTopic(topic, true, pendingFuture,
pendingTopic.getProperties());
+ checkOwnershipAndCreatePersistentTopic(topic,
+ pendingTopic.isCreateIfMissing(),
+ pendingFuture,
+ pendingTopic.getProperties());
pendingFuture.handle((persistentTopic, ex) -> {
// release permit and process next pending topic
if (acquiredPermit) {
@@ -3162,6 +3166,7 @@ public class BrokerService implements Closeable {
@Getter
private static class TopicLoadingContext {
private final String topic;
+ private final boolean createIfMissing;
private final CompletableFuture<Optional<Topic>> topicFuture;
private final Map<String, String> properties;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 93cf004e017..dbab6996c87 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -41,6 +41,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -1108,6 +1109,68 @@ public class BrokerServiceTest extends BrokerTestBase {
}
}
+ @Test
+ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated()
throws Exception {
+ boolean needDeleteTopic = false;
+ final String namespace = "prop/concurrentLoad";
+ try {
+ // set up broker disable auto create and set concurrent load to 1
qps.
+ cleanup();
+ conf.setMaxConcurrentTopicLoadRequest(1);
+ conf.setAllowAutoTopicCreation(false);
+ setup();
+
+ try {
+ admin.namespaces().createNamespace(namespace);
+ } catch (PulsarAdminException.ConflictException e) {
+ // Ok.. (if test fails intermittently and namespace is already
created)
+ }
+
+ // create 3 topic
+ String topicName = "persistent://" + namespace + "/my-topic";
+
+ for (int i = 0; i < 3; i++) {
+ admin.topics().createNonPartitionedTopic(topicName + "_" + i);
+ }
+
+ needDeleteTopic = true;
+
+ // try to load 10 topic
+ ArrayList<CompletableFuture<Optional<Topic>>> loadFutures = new
ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ // try to create topic which should fail as bundle is disable
+ CompletableFuture<Optional<Topic>> futureResult =
pulsar.getBrokerService()
+ .loadOrCreatePersistentTopic(topicName + "_" + i,
false, null);
+ loadFutures.add(futureResult);
+ }
+
+ CompletableFuture<?>[] o = (CompletableFuture<?>[])
Array.newInstance(CompletableFuture.class, 10);
+ CompletableFuture<?>[] completableFutures = loadFutures.toArray(o);
+ CompletableFuture.allOf(completableFutures).get();
+
+ // check topic load CompletableFuture. only first three topic
should be success.
+ for (int i = 0; i < 10; i++) {
+ CompletableFuture<Optional<Topic>> load = loadFutures.get(i);
+ if (i < 3) {
+ Assert.assertTrue(load.isDone());
+ Assert.assertFalse(load.isCompletedExceptionally());
+ } else {
+ // check topic should not be created if disable
autoCreateTopic.
+ Assert.assertTrue(load.isDone());
+ Assert.assertTrue(load.get().isEmpty());
+ }
+ }
+ } finally {
+ if (needDeleteTopic) {
+ String topicName = "persistent://" + namespace + "/my-topic";
+
+ for (int i = 0; i < 3; i++) {
+ admin.topics().delete(topicName + "_" + i);
+ }
+ }
+ }
+ }
+
/**
* Verifies brokerService should not have deadlock and successfully remove
topic from topicMap on topic-failure and
* it should not introduce deadlock while performing it.