This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ecd40e43a6b [fix][broker] Fix unload operation stuck when use
ExtensibleLoadManager (#21332)
ecd40e43a6b is described below
commit ecd40e43a6b90b58e209bc9bced84b35d933619e
Author: Kai Wang <[email protected]>
AuthorDate: Thu Oct 19 07:08:49 2023 -0500
[fix][broker] Fix unload operation stuck when use ExtensibleLoadManager
(#21332)
---
.../extensions/ExtensibleLoadManagerImpl.java | 2 +-
.../channel/ServiceUnitStateChannelImpl.java | 17 +-
.../extensions/manager/UnloadManager.java | 7 +
.../pulsar/broker/namespace/NamespaceService.java | 4 -
.../pulsar/broker/service/BrokerService.java | 15 +
.../extensions/ExtensibleLoadManagerImplTest.java | 309 ++++++++++++---------
.../channel/ServiceUnitStateChannelTest.java | 12 +-
7 files changed, 218 insertions(+), 148 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 85baf9ec4fb..d3119365ddf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -304,7 +304,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
}
});
});
- this.serviceUnitStateChannel = new
ServiceUnitStateChannelImpl(pulsar);
+ this.serviceUnitStateChannel =
ServiceUnitStateChannelImpl.newInstance(pulsar);
this.brokerRegistry.start();
this.splitManager = new SplitManager(splitCounter);
this.unloadManager = new UnloadManager(unloadCounter);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index f7e09a2bec5..3cf16709cde 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -200,7 +200,18 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
Unstable
}
+ public static ServiceUnitStateChannelImpl newInstance(PulsarService
pulsar) {
+ return new ServiceUnitStateChannelImpl(pulsar);
+ }
+
public ServiceUnitStateChannelImpl(PulsarService pulsar) {
+ this(pulsar, MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS,
OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS);
+ }
+
+ @VisibleForTesting
+ public ServiceUnitStateChannelImpl(PulsarService pulsar,
+ long inFlightStateWaitingTimeInMillis,
+ long ownershipMonitorDelayTimeInSecs) {
this.pulsar = pulsar;
this.config = pulsar.getConfig();
this.lookupServiceAddress = pulsar.getLookupServiceAddress();
@@ -210,8 +221,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
this.stateChangeListeners = new StateChangeListeners();
this.semiTerminalStateWaitingTimeInMillis =
config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds()
* 1000;
- this.inFlightStateWaitingTimeInMillis =
MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS;
- this.ownershipMonitorDelayTimeInSecs =
OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS;
+ this.inFlightStateWaitingTimeInMillis =
inFlightStateWaitingTimeInMillis;
+ this.ownershipMonitorDelayTimeInSecs = ownershipMonitorDelayTimeInSecs;
if (semiTerminalStateWaitingTimeInMillis <
inFlightStateWaitingTimeInMillis) {
throw new IllegalArgumentException(
"Invalid Config:
loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds < "
@@ -837,7 +848,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
} finally {
var future = requested.getValue();
if (future != null) {
- future.orTimeout(inFlightStateWaitingTimeInMillis,
TimeUnit.MILLISECONDS)
+ future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000,
TimeUnit.MILLISECONDS)
.whenComplete((v, e) -> {
if (e != null) {
getOwnerRequests.remove(serviceUnit,
future);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
index 2dde0c4708e..ffdbbc2af42 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -88,6 +88,13 @@ public class UnloadManager implements StateChangeListener {
@Override
public void handleEvent(String serviceUnit, ServiceUnitStateData data,
Throwable t) {
+ if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Handling {} for service unit {} with exception.",
data, serviceUnit, t);
+ }
+ this.complete(serviceUnit, t);
+ return;
+ }
ServiceUnitState state = ServiceUnitStateData.state(data);
switch (state) {
case Free, Owned -> this.complete(serviceUnit, t);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 0adc2b59470..0d35e7cad69 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1558,10 +1558,6 @@ public class NamespaceService implements AutoCloseable {
public CompletableFuture<Boolean>
checkOwnershipPresentAsync(NamespaceBundle bundle) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
- if (bundle.getNamespaceObject().equals(SYSTEM_NAMESPACE)) {
- return FutureUtil.failedFuture(new
UnsupportedOperationException(
- "Ownership check for system namespace is not
supported"));
- }
ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnershipAsync(Optional.empty(),
bundle)
.thenApply(Optional::isPresent);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b85f77cb2f5..4642ef19b37 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2220,6 +2220,21 @@ public class BrokerService implements Closeable {
if (serviceUnit.includes(topicName)) {
// Topic needs to be unloaded
log.info("[{}] Unloading topic", topicName);
+ if (topicFuture.isCompletedExceptionally()) {
+ try {
+ topicFuture.get();
+ } catch (InterruptedException | ExecutionException ex) {
+ if (ex.getCause() instanceof
ServiceUnitNotReadyException) {
+ // Topic was already unloaded
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Topic was already unloaded",
topicName);
+ }
+ return;
+ } else {
+ log.warn("[{}] Got exception when closing topic",
topicName, ex);
+ }
+ }
+ }
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ?
t.get().close(closeWithoutWaitingClientDisconnect)
: CompletableFuture.completedFuture(null)));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 011e7174cbe..20ba9500cb1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -38,9 +38,11 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
import static
org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespace;
import static
org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespaceV2;
import static
org.apache.pulsar.broker.namespace.NamespaceService.getSLAMonitorNamespace;
+import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -113,6 +115,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.awaitility.Awaitility;
+import org.mockito.MockedStatic;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -142,46 +145,56 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
@BeforeClass
@Override
public void setup() throws Exception {
- conf.setForceDeleteNamespaceAllowed(true);
- conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
- conf.setAllowAutoTopicCreation(true);
-
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
- conf.setLoadBalancerSheddingEnabled(false);
- conf.setLoadBalancerDebugModeEnabled(true);
- conf.setTopicLevelPoliciesEnabled(false);
- super.internalSetup(conf);
- pulsar1 = pulsar;
- ServiceConfiguration defaultConf = getDefaultConf();
- defaultConf.setAllowAutoTopicCreation(true);
- defaultConf.setForceDeleteNamespaceAllowed(true);
-
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
- defaultConf.setLoadBalancerSheddingEnabled(false);
- defaultConf.setTopicLevelPoliciesEnabled(false);
- additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf);
- pulsar2 = additionalPulsarTestContext.getPulsarService();
-
- setPrimaryLoadManager();
-
- setSecondaryLoadManager();
-
- admin.clusters().createCluster(this.conf.getClusterName(),
-
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
- admin.tenants().createTenant("public",
- new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
- Sets.newHashSet(this.conf.getClusterName())));
- admin.namespaces().createNamespace("public/default");
- admin.namespaces().setNamespaceReplicationClusters("public/default",
- Sets.newHashSet(this.conf.getClusterName()));
-
- admin.namespaces().createNamespace(defaultTestNamespace);
-
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
- Sets.newHashSet(this.conf.getClusterName()));
+ try (MockedStatic<ServiceUnitStateChannelImpl> channelMockedStatic =
+ mockStatic(ServiceUnitStateChannelImpl.class)) {
+ channelMockedStatic.when(() ->
ServiceUnitStateChannelImpl.newInstance(isA(PulsarService.class)))
+ .thenAnswer(invocation -> {
+ PulsarService pulsarService =
invocation.getArgument(0);
+ // Set the inflight state waiting time and ownership
monitor delay time to 5 seconds to avoid
+ // stuck when doing unload.
+ return new ServiceUnitStateChannelImpl(pulsarService,
5 * 1000, 1);
+ });
+ conf.setForceDeleteNamespaceAllowed(true);
+ conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+ conf.setAllowAutoTopicCreation(true);
+
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+ conf.setLoadBalancerSheddingEnabled(false);
+ conf.setLoadBalancerDebugModeEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ super.internalSetup(conf);
+ pulsar1 = pulsar;
+ ServiceConfiguration defaultConf = getDefaultConf();
+ defaultConf.setAllowAutoTopicCreation(true);
+ defaultConf.setForceDeleteNamespaceAllowed(true);
+
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+ defaultConf.setLoadBalancerSheddingEnabled(false);
+ defaultConf.setTopicLevelPoliciesEnabled(true);
+ additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf);
+ pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+ setPrimaryLoadManager();
+
+ setSecondaryLoadManager();
+
+ admin.clusters().createCluster(this.conf.getClusterName(),
+
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant("public",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
+ Sets.newHashSet(this.conf.getClusterName())));
+ admin.namespaces().createNamespace("public/default");
+
admin.namespaces().setNamespaceReplicationClusters("public/default",
+ Sets.newHashSet(this.conf.getClusterName()));
+
+ admin.namespaces().createNamespace(defaultTestNamespace);
+
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
+ Sets.newHashSet(this.conf.getClusterName()));
+ }
}
@Override
- @AfterClass
+ @AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
pulsar1 = null;
pulsar2.close();
@@ -557,119 +570,134 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
@Test
public void testDeployAndRollbackLoadManager() throws Exception {
- // Test rollback to modular load manager.
- ServiceConfiguration defaultConf = getDefaultConf();
- defaultConf.setAllowAutoTopicCreation(true);
- defaultConf.setForceDeleteNamespaceAllowed(true);
-
defaultConf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
- defaultConf.setLoadBalancerSheddingEnabled(false);
- try (var additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf)) {
- // start pulsar3 with old load manager
- var pulsar3 = additionalPulsarTestContext.getPulsarService();
- String topic = "persistent://" + defaultTestNamespace + "/test";
-
- String lookupResult1 =
pulsar3.getAdminClient().lookups().lookupTopic(topic);
- assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
-
- String lookupResult2 =
pulsar1.getAdminClient().lookups().lookupTopic(topic);
- String lookupResult3 =
pulsar2.getAdminClient().lookups().lookupTopic(topic);
- assertEquals(lookupResult1, lookupResult2);
- assertEquals(lookupResult1, lookupResult3);
-
- NamespaceBundle bundle = getBundleAsync(pulsar1,
TopicName.get(topic)).get();
- LookupOptions options = LookupOptions.builder()
- .authoritative(false)
- .requestHttps(false)
- .readOnly(false)
- .loadTopicsInBundle(false).build();
- Optional<URL> webServiceUrl1 =
- pulsar1.getNamespaceService().getWebServiceUrl(bundle,
options);
- assertTrue(webServiceUrl1.isPresent());
- assertEquals(webServiceUrl1.get().toString(),
pulsar3.getWebServiceAddress());
-
- Optional<URL> webServiceUrl2 =
- pulsar2.getNamespaceService().getWebServiceUrl(bundle,
options);
- assertTrue(webServiceUrl2.isPresent());
- assertEquals(webServiceUrl2.get().toString(),
webServiceUrl1.get().toString());
-
- Optional<URL> webServiceUrl3 =
- pulsar3.getNamespaceService().getWebServiceUrl(bundle,
options);
- assertTrue(webServiceUrl3.isPresent());
- assertEquals(webServiceUrl3.get().toString(),
webServiceUrl1.get().toString());
-
- List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2,
pulsar3);
- for (PulsarService pulsarService : pulsarServices) {
- // Test lookup heartbeat namespace's topic
- for (PulsarService pulsar : pulsarServices) {
- assertLookupHeartbeatOwner(pulsarService,
pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
- }
- // Test lookup SLA namespace's topic
- for (PulsarService pulsar : pulsarServices) {
- assertLookupSLANamespaceOwner(pulsarService,
pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
- }
- }
-
- // Test deploy new broker with new load manager
- ServiceConfiguration conf = getDefaultConf();
- conf.setAllowAutoTopicCreation(true);
- conf.setForceDeleteNamespaceAllowed(true);
-
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
- try (var additionPulsarTestContext =
createAdditionalPulsarTestContext(conf)) {
- var pulsar4 = additionPulsarTestContext.getPulsarService();
-
- Set<String> availableCandidates =
Sets.newHashSet(pulsar1.getBrokerServiceUrl(),
- pulsar2.getBrokerServiceUrl(),
- pulsar4.getBrokerServiceUrl());
- String lookupResult4 =
pulsar4.getAdminClient().lookups().lookupTopic(topic);
- assertTrue(availableCandidates.contains(lookupResult4));
-
- String lookupResult5 =
pulsar1.getAdminClient().lookups().lookupTopic(topic);
- String lookupResult6 =
pulsar2.getAdminClient().lookups().lookupTopic(topic);
- String lookupResult7 =
pulsar3.getAdminClient().lookups().lookupTopic(topic);
- assertEquals(lookupResult4, lookupResult5);
- assertEquals(lookupResult4, lookupResult6);
- assertEquals(lookupResult4, lookupResult7);
-
- Set<String> availableWebUrlCandidates =
Sets.newHashSet(pulsar1.getWebServiceAddress(),
- pulsar2.getWebServiceAddress(),
- pulsar4.getWebServiceAddress());
-
- webServiceUrl1 =
+ try (MockedStatic<ServiceUnitStateChannelImpl> channelMockedStatic =
+ mockStatic(ServiceUnitStateChannelImpl.class)) {
+ channelMockedStatic.when(() ->
ServiceUnitStateChannelImpl.newInstance(isA(PulsarService.class)))
+ .thenAnswer(invocation -> {
+ PulsarService pulsarService =
invocation.getArgument(0);
+ // Set the inflight state waiting time and ownership
monitor delay time to 5 seconds to avoid
+ // stuck when doing unload.
+ return new ServiceUnitStateChannelImpl(pulsarService,
5 * 1000, 1);
+ });
+ // Test rollback to modular load manager.
+ ServiceConfiguration defaultConf = getDefaultConf();
+ defaultConf.setAllowAutoTopicCreation(true);
+ defaultConf.setForceDeleteNamespaceAllowed(true);
+
defaultConf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+ defaultConf.setLoadBalancerSheddingEnabled(false);
+ try (var additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf)) {
+ // start pulsar3 with old load manager
+ var pulsar3 = additionalPulsarTestContext.getPulsarService();
+ String topic = "persistent://" + defaultTestNamespace +
"/test";
+
+ String lookupResult1 =
pulsar3.getAdminClient().lookups().lookupTopic(topic);
+ assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
+
+ String lookupResult2 =
pulsar1.getAdminClient().lookups().lookupTopic(topic);
+ String lookupResult3 =
pulsar2.getAdminClient().lookups().lookupTopic(topic);
+ assertEquals(lookupResult1, lookupResult2);
+ assertEquals(lookupResult1, lookupResult3);
+
+ NamespaceBundle bundle = getBundleAsync(pulsar1,
TopicName.get(topic)).get();
+ LookupOptions options = LookupOptions.builder()
+ .authoritative(false)
+ .requestHttps(false)
+ .readOnly(false)
+ .loadTopicsInBundle(false).build();
+ Optional<URL> webServiceUrl1 =
pulsar1.getNamespaceService().getWebServiceUrl(bundle,
options);
assertTrue(webServiceUrl1.isPresent());
-
assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString()));
+ assertEquals(webServiceUrl1.get().toString(),
pulsar3.getWebServiceAddress());
- webServiceUrl2 =
+ Optional<URL> webServiceUrl2 =
pulsar2.getNamespaceService().getWebServiceUrl(bundle,
options);
assertTrue(webServiceUrl2.isPresent());
assertEquals(webServiceUrl2.get().toString(),
webServiceUrl1.get().toString());
- // The pulsar3 will redirect to pulsar4
- webServiceUrl3 =
+ Optional<URL> webServiceUrl3 =
pulsar3.getNamespaceService().getWebServiceUrl(bundle,
options);
assertTrue(webServiceUrl3.isPresent());
- // It will redirect to pulsar4
-
assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString()));
-
- var webServiceUrl4 =
- pulsar4.getNamespaceService().getWebServiceUrl(bundle,
options);
- assertTrue(webServiceUrl4.isPresent());
- assertEquals(webServiceUrl4.get().toString(),
webServiceUrl1.get().toString());
+ assertEquals(webServiceUrl3.get().toString(),
webServiceUrl1.get().toString());
- pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4);
+ List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2,
pulsar3);
for (PulsarService pulsarService : pulsarServices) {
// Test lookup heartbeat namespace's topic
for (PulsarService pulsar : pulsarServices) {
- assertLookupHeartbeatOwner(pulsarService,
pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ assertLookupHeartbeatOwner(pulsarService,
+ pulsar.getLookupServiceAddress(),
pulsar.getBrokerServiceUrl());
}
// Test lookup SLA namespace's topic
for (PulsarService pulsar : pulsarServices) {
- assertLookupSLANamespaceOwner(pulsarService,
pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ assertLookupSLANamespaceOwner(pulsarService,
+ pulsar.getLookupServiceAddress(),
pulsar.getBrokerServiceUrl());
+ }
+ }
+
+ // Test deploy new broker with new load manager
+ ServiceConfiguration conf = getDefaultConf();
+ conf.setAllowAutoTopicCreation(true);
+ conf.setForceDeleteNamespaceAllowed(true);
+
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+ try (var additionPulsarTestContext =
createAdditionalPulsarTestContext(conf)) {
+ var pulsar4 = additionPulsarTestContext.getPulsarService();
+
+ Set<String> availableCandidates =
Sets.newHashSet(pulsar1.getBrokerServiceUrl(),
+ pulsar2.getBrokerServiceUrl(),
+ pulsar4.getBrokerServiceUrl());
+ String lookupResult4 =
pulsar4.getAdminClient().lookups().lookupTopic(topic);
+ assertTrue(availableCandidates.contains(lookupResult4));
+
+ String lookupResult5 =
pulsar1.getAdminClient().lookups().lookupTopic(topic);
+ String lookupResult6 =
pulsar2.getAdminClient().lookups().lookupTopic(topic);
+ String lookupResult7 =
pulsar3.getAdminClient().lookups().lookupTopic(topic);
+ assertEquals(lookupResult4, lookupResult5);
+ assertEquals(lookupResult4, lookupResult6);
+ assertEquals(lookupResult4, lookupResult7);
+
+ Set<String> availableWebUrlCandidates =
Sets.newHashSet(pulsar1.getWebServiceAddress(),
+ pulsar2.getWebServiceAddress(),
+ pulsar4.getWebServiceAddress());
+
+ webServiceUrl1 =
+
pulsar1.getNamespaceService().getWebServiceUrl(bundle, options);
+ assertTrue(webServiceUrl1.isPresent());
+
assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString()));
+
+ webServiceUrl2 =
+
pulsar2.getNamespaceService().getWebServiceUrl(bundle, options);
+ assertTrue(webServiceUrl2.isPresent());
+ assertEquals(webServiceUrl2.get().toString(),
webServiceUrl1.get().toString());
+
+ // The pulsar3 will redirect to pulsar4
+ webServiceUrl3 =
+
pulsar3.getNamespaceService().getWebServiceUrl(bundle, options);
+ assertTrue(webServiceUrl3.isPresent());
+ // It will redirect to pulsar4
+
assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString()));
+
+ var webServiceUrl4 =
+
pulsar4.getNamespaceService().getWebServiceUrl(bundle, options);
+ assertTrue(webServiceUrl4.isPresent());
+ assertEquals(webServiceUrl4.get().toString(),
webServiceUrl1.get().toString());
+
+ pulsarServices = List.of(pulsar1, pulsar2, pulsar3,
pulsar4);
+ for (PulsarService pulsarService : pulsarServices) {
+ // Test lookup heartbeat namespace's topic
+ for (PulsarService pulsar : pulsarServices) {
+ assertLookupHeartbeatOwner(pulsarService,
+ pulsar.getLookupServiceAddress(),
pulsar.getBrokerServiceUrl());
+ }
+ // Test lookup SLA namespace's topic
+ for (PulsarService pulsar : pulsarServices) {
+ assertLookupSLANamespaceOwner(pulsarService,
+ pulsar.getLookupServiceAddress(),
pulsar.getBrokerServiceUrl());
+ }
}
}
}
}
+
}
private void assertLookupHeartbeatOwner(PulsarService pulsar,
@@ -1108,6 +1136,12 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
NamespaceName heartbeatNamespacePulsar2V2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(),
pulsar2.getConfiguration());
+ NamespaceName slaMonitorNamespacePulsar1 =
+ getSLAMonitorNamespace(pulsar1.getLookupServiceAddress(),
pulsar1.getConfiguration());
+
+ NamespaceName slaMonitorNamespacePulsar2 =
+ getSLAMonitorNamespace(pulsar2.getLookupServiceAddress(),
pulsar2.getConfiguration());
+
NamespaceBundle bundle1 =
pulsar1.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar1V1);
NamespaceBundle bundle2 =
pulsar1.getNamespaceService().getNamespaceBundleFactory()
@@ -1118,27 +1152,34 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
NamespaceBundle bundle4 =
pulsar2.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar2V2);
+ NamespaceBundle slaBundle1 =
pulsar1.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundle(slaMonitorNamespacePulsar1);
+ NamespaceBundle slaBundle2 =
pulsar2.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundle(slaMonitorNamespacePulsar2);
+
+
Set<NamespaceBundle> ownedServiceUnitsByPulsar1 =
primaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
// heartbeat namespace bundle will own by pulsar1
- assertEquals(ownedServiceUnitsByPulsar1.size(), 3);
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1));
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2));
+ assertTrue(ownedServiceUnitsByPulsar1.contains(slaBundle1));
Set<NamespaceBundle> ownedServiceUnitsByPulsar2 =
secondaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
- assertEquals(ownedServiceUnitsByPulsar2.size(), 3);
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3));
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
+ assertTrue(ownedServiceUnitsByPulsar2.contains(slaBundle2));
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(),
pulsar1.getLookupServiceAddress());
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(),
pulsar2.getLookupServiceAddress());
- assertEquals(ownedNamespacesByPulsar1.size(), 3);
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString()));
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString()));
- assertEquals(ownedNamespacesByPulsar2.size(), 3);
+
assertTrue(ownedNamespacesByPulsar1.containsKey(slaBundle1.toString()));
+
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle3.toString()));
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle4.toString()));
+
assertTrue(ownedNamespacesByPulsar2.containsKey(slaBundle2.toString()));
String topic = "persistent://" + defaultTestNamespace +
"/test-get-owned-service-units";
admin.topics().createPartitionedTopic(topic, 1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index ad58d0b4883..57d4537bdeb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -507,10 +507,10 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertEquals(1, getOwnerRequests1.size());
assertEquals(1, getOwnerRequests2.size());
- // In 5 secs, the getOwnerAsync requests(lookup requests) should time
out.
- Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ // In 10 secs, the getOwnerAsync requests(lookup requests) should time
out.
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertTrue(owner1.isCompletedExceptionally()));
- Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertTrue(owner2.isCompletedExceptionally()));
assertEquals(0, getOwnerRequests1.size());
@@ -1139,10 +1139,10 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertFalse(owner1.isDone());
assertFalse(owner2.isDone());
- // In 5 secs, the getOwnerAsync requests(lookup requests) should time
out.
- Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ // In 10 secs, the getOwnerAsync requests(lookup requests) should time
out.
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertTrue(owner1.isCompletedExceptionally()));
- Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertTrue(owner2.isCompletedExceptionally()));
// recovered, check the monitor update state : Assigned -> Owned