This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new be7e890d91c [improve][broker] PIP-192: Update the lookup data path to
support deploy and rollback (#19999)
be7e890d91c is described below
commit be7e890d91cbb28014280b088de35b3a3bf513b8
Author: Kai Wang <[email protected]>
AuthorDate: Wed Apr 5 22:32:48 2023 +0800
[improve][broker] PIP-192: Update the lookup data path to support deploy
and rollback (#19999)
Co-authored-by: Jiwe Guo <[email protected]>
---
.../loadbalance/extensions/BrokerRegistryImpl.java | 2 +-
.../extensions/ExtensibleLoadManagerImplTest.java | 30 +++++++++++++++++++++-
2 files changed, 30 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index de0d361316d..4c2beba5d30 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -51,7 +51,7 @@ import
org.apache.pulsar.metadata.api.coordination.ResourceLock;
@Slf4j
public class BrokerRegistryImpl implements BrokerRegistry {
- protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+ protected static final String LOOKUP_DATA_PATH =
"/loadbalance/extension/brokers";
private final PulsarService pulsar;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index a9b145e3029..57590d8d7da 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -84,6 +84,7 @@ import
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import
org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
@@ -166,7 +167,7 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
this.additionalPulsarTestContext.close();
}
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
protected void initializeState() throws PulsarAdminException {
admin.namespaces().unload("public/default");
reset(primaryLoadManager, secondaryLoadManager);
@@ -470,6 +471,33 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
assertEquals(brokerLookupData.get().getWebServiceUrl(),
pulsar2.getWebServiceAddress());
}
+ @Test
+ public void testStartOldLoadManager() throws Exception {
+ ServiceConfiguration defaultConf = getDefaultConf();
+ defaultConf.setAllowAutoTopicCreation(true);
+ defaultConf.setForceDeleteNamespaceAllowed(true);
+
defaultConf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+ defaultConf.setLoadBalancerSheddingEnabled(false);
+ try (var additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf)) {
+ // start pulsar3 with old load manager
+ var pulsar3 = additionalPulsarTestContext.getPulsarService();
+
+ var availableBrokers =
pulsar3.getLoadManager().get().getAvailableBrokers();
+ assertEquals(availableBrokers.size(), 1);
+ assertEquals(availableBrokers.iterator().next(),
pulsar3.getLookupServiceAddress());
+
+ availableBrokers =
pulsar1.getLoadManager().get().getAvailableBrokers();
+ assertEquals(availableBrokers.size(), 2);
+
assertTrue(availableBrokers.contains(pulsar1.getLookupServiceAddress()));
+
assertTrue(availableBrokers.contains(pulsar2.getLookupServiceAddress()));
+
+ availableBrokers =
pulsar2.getLoadManager().get().getAvailableBrokers();
+ assertEquals(availableBrokers.size(), 2);
+
assertTrue(availableBrokers.contains(pulsar1.getLookupServiceAddress()));
+
assertTrue(availableBrokers.contains(pulsar2.getLookupServiceAddress()));
+ }
+ }
+
@Test
public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws
Exception {
var topBundlesLoadDataStorePrimary =