This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9733f7e9472 [fix][broker] pre-create non-partitioned system topics for
load balance extension (#20370)
9733f7e9472 is described below
commit 9733f7e94727b8f47070670f496e9534aa2e17c2
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue May 23 21:26:49 2023 -0700
[fix][broker] pre-create non-partitioned system topics for load balance
extension (#20370)
PIP: https://github.com/apache/pulsar/issues/16691
### Motivation
We need to create system topics without partitions explicitly. Currently,
we do not support partitioned system topics.
### Modifications
create system topics without partitions explicitly
(cherry picked from commit 1080ad5c787bb317347b3f1f12b78ba3dec49757)
---
.../extensions/ExtensibleLoadManagerImpl.java | 17 +++++++++++++++++
.../extensions/channel/ServiceUnitStateChannelImpl.java | 2 ++
.../extensions/ExtensibleLoadManagerImplTest.java | 2 ++
.../extensions/channel/ServiceUnitStateChannelTest.java | 2 ++
4 files changed, 23 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 4ebf537f7a8..348098df874 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -78,6 +78,7 @@ import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionS
import
org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -212,6 +213,19 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
}
+ public static void createSystemTopic(PulsarService pulsar, String topic)
throws PulsarServerException {
+ try {
+ pulsar.getAdminClient().topics().createNonPartitionedTopic(topic);
+ log.info("Created topic {}.", topic);
+ } catch (PulsarAdminException.ConflictException ex) {
+ if (debug(pulsar.getConfiguration(), log)) {
+ log.info("Topic {} already exists.", topic, ex);
+ }
+ } catch (PulsarAdminException e) {
+ throw new PulsarServerException(e);
+ }
+ }
+
@Override
public void start() throws PulsarServerException {
if (this.started) {
@@ -246,6 +260,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
this.brokerFilterPipeline.add(new
BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
+ createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
+ createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
+
try {
this.brokerLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC,
BrokerLoadData.class);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index b4c4e7fd5d4..f476675d01b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -286,6 +286,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
PulsarClusterMetadataSetup.createNamespaceIfAbsent
(pulsar.getPulsarResources(),
NamespaceName.SYSTEM_NAMESPACE, config.getClusterName());
+ ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);
+
producer = pulsar.getClient().newProducer(schema)
.enableBatching(true)
.compressionType(MSG_COMPRESSION_TYPE)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index b71eeb4745b..d72ce9661b9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -97,6 +97,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -129,6 +130,7 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
conf.setForceDeleteNamespaceAllowed(true);
+ conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
conf.setAllowAutoTopicCreation(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 77c80187a63..0f682cf048f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -86,6 +86,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TableViewImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
@@ -129,6 +130,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
@Override
protected void setup() throws Exception {
conf.setAllowAutoTopicCreation(true);
+ conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
conf.setLoadBalancerDebugModeEnabled(true);
conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10);
super.internalSetup(conf);