This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9733f7e9472 [fix][broker] pre-create non-partitioned system topics for 
load balance extension (#20370)
9733f7e9472 is described below

commit 9733f7e94727b8f47070670f496e9534aa2e17c2
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue May 23 21:26:49 2023 -0700

    [fix][broker] pre-create non-partitioned system topics for load balance 
extension (#20370)
    
    PIP: https://github.com/apache/pulsar/issues/16691
    
    ### Motivation
    
    We need to create system topics without partitions explicitly. Currently, 
we do not support partitioned system topics.
    
    ### Modifications
    
     create system topics without partitions explicitly
    
    (cherry picked from commit 1080ad5c787bb317347b3f1f12b78ba3dec49757)
---
 .../extensions/ExtensibleLoadManagerImpl.java           | 17 +++++++++++++++++
 .../extensions/channel/ServiceUnitStateChannelImpl.java |  2 ++
 .../extensions/ExtensibleLoadManagerImplTest.java       |  2 ++
 .../extensions/channel/ServiceUnitStateChannelTest.java |  2 ++
 4 files changed, 23 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 4ebf537f7a8..348098df874 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -78,6 +78,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionS
 import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -212,6 +213,19 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
     }
 
+    public static void createSystemTopic(PulsarService pulsar, String topic) 
throws PulsarServerException {
+        try {
+            pulsar.getAdminClient().topics().createNonPartitionedTopic(topic);
+            log.info("Created topic {}.", topic);
+        } catch (PulsarAdminException.ConflictException ex) {
+            if (debug(pulsar.getConfiguration(), log)) {
+                log.info("Topic {} already exists.", topic, ex);
+            }
+        } catch (PulsarAdminException e) {
+            throw new PulsarServerException(e);
+        }
+    }
+
     @Override
     public void start() throws PulsarServerException {
         if (this.started) {
@@ -246,6 +260,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
         this.brokerFilterPipeline.add(new 
BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
 
+        createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
+        createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
+
         try {
             this.brokerLoadDataStore = LoadDataStoreFactory
                     .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, 
BrokerLoadData.class);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index b4c4e7fd5d4..f476675d01b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -286,6 +286,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             PulsarClusterMetadataSetup.createNamespaceIfAbsent
                     (pulsar.getPulsarResources(), 
NamespaceName.SYSTEM_NAMESPACE, config.getClusterName());
 
+            ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);
+
             producer = pulsar.getClient().newProducer(schema)
                     .enableBatching(true)
                     .compressionType(MSG_COMPRESSION_TYPE)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index b71eeb4745b..d72ce9661b9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -97,6 +97,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -129,6 +130,7 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
     @Override
     public void setup() throws Exception {
         conf.setForceDeleteNamespaceAllowed(true);
+        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
         conf.setAllowAutoTopicCreation(true);
         
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
         
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 77c80187a63..0f682cf048f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -86,6 +86,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.TableViewImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.NotificationType;
@@ -129,6 +130,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     @Override
     protected void setup() throws Exception {
         conf.setAllowAutoTopicCreation(true);
+        conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
         conf.setLoadBalancerDebugModeEnabled(true);
         conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10);
         super.internalSetup(conf);

Reply via email to