This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f43dde5f9f9 [fix][broker] Skip topic auto-creation for
ExtensibleLoadManager internal topics (#21729)
f43dde5f9f9 is described below
commit f43dde5f9f9d4c3f0820fcd8f56d546c3e1ff2a3
Author: Heesung Sohn <[email protected]>
AuthorDate: Fri Dec 15 12:34:59 2023 -0800
[fix][broker] Skip topic auto-creation for ExtensibleLoadManager internal
topics (#21729)
(cherry picked from commit 88df040ed34e6863f2c255ace1b050030a3d54e7)
---
.../extensions/ExtensibleLoadManagerImpl.java | 2 +-
.../pulsar/broker/service/BrokerService.java | 8 +--
.../BrokerServiceAutoTopicCreationTest.java | 63 ++++++++++++++++++++++
3 files changed, 68 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index d3119365ddf..37ca29da260 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -732,7 +732,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
}
}
- private boolean isInternalTopic(String topic) {
+ public static boolean isInternalTopic(String topic) {
return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
|| topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 045f4d6467a..c87402f9253 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -103,7 +103,7 @@ import
org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
-import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
@@ -3378,10 +3378,10 @@ public class BrokerService implements Closeable {
return CompletableFuture.completedFuture(false);
}
- // ServiceUnitStateChannelImpl.TOPIC expects to be a
non-partitioned-topic now.
+ // ExtensibleLoadManagerImpl.internal topics expects to be
non-partitioned-topics now.
// We don't allow the auto-creation here.
- // ServiceUnitStateChannelImpl.start() is responsible to create the
topic.
- if (ServiceUnitStateChannelImpl.TOPIC.equals(topicName.toString())) {
+ // ExtensibleLoadManagerImpl.start() is responsible to create the
internal system topics.
+ if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
return CompletableFuture.completedFuture(false);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index a28b60bbae3..0a6cffc7685 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -25,18 +25,27 @@ import static org.testng.Assert.fail;
import java.util.List;
+import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -526,4 +535,58 @@ public class BrokerServiceAutoTopicCreationTest extends
BrokerTestBase{
}
}
+ @Test
+ public void testExtensibleLoadManagerImplInternalTopicAutoCreations()
+ throws PulsarAdminException, PulsarClientException {
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+ pulsar.getConfiguration().setDefaultNumPartitions(3);
+ pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(5);
+ final String namespaceName = NamespaceName.SYSTEM_NAMESPACE.toString();
+ TenantInfoImpl tenantInfo = new TenantInfoImpl();
+ tenantInfo.setAllowedClusters(Set.of(configClusterName));
+ admin.tenants().createTenant("pulsar", tenantInfo);
+ admin.namespaces().createNamespace(namespaceName);
+
admin.topics().createNonPartitionedTopic(ServiceUnitStateChannelImpl.TOPIC);
+
admin.topics().createNonPartitionedTopic(ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC);
+
admin.topics().createNonPartitionedTopic(ExtensibleLoadManagerImpl.TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
+
+ // clear the topics to test the auto creation of non-persistent topics.
+ ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
topics =
+ pulsar.getBrokerService().getTopics();
+ ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
oldTopics = new ConcurrentOpenHashMap<>();
+ topics.forEach((key, val) -> oldTopics.put(key, val));
+ topics.clear();
+
+ // The created persistent topic correctly can be found by
+ //
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
+ Producer producer =
pulsarClient.newProducer().topic(ServiceUnitStateChannelImpl.TOPIC).create();
+
+ // The created non-persistent topics cannot be found, as we did
topics.clear()
+ try {
+
pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC).create();
+ Assert.fail("Create should have failed.");
+ } catch (PulsarClientException.TopicDoesNotExistException e) {
+ // expected
+ }
+ try {
+
pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.TOP_BUNDLES_LOAD_DATA_STORE_TOPIC).create();
+ Assert.fail("Create should have failed.");
+ } catch (PulsarClientException.TopicDoesNotExistException e) {
+ // expected
+ }
+
+ oldTopics.forEach((key, val) -> topics.put(key, val));
+
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ List<String> partitionedTopicList =
admin.topics().getPartitionedTopicList(namespaceName);
+ assertEquals(partitionedTopicList.size(), 0);
+ });
+
+ producer.close();
+ admin.namespaces().deleteNamespace(namespaceName);
+ admin.tenants().deleteTenant("pulsar");
+
+ }
+
}