This is an automated email from the ASF dual-hosted git repository. kwang pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2b2b83d60e708d4c0e8ee304b2d556c0fc71c5d4 Author: Kai Wang <[email protected]> AuthorDate: Thu Oct 19 07:08:49 2023 -0500 [fix][broker] Fix unload operation stuck when use ExtensibleLoadManager (#21332) (cherry picked from commit ecd40e43a6b90b58e209bc9bced84b35d933619e) --- .../extensions/ExtensibleLoadManagerImpl.java | 2 +- .../channel/ServiceUnitStateChannelImpl.java | 17 +- .../extensions/manager/UnloadManager.java | 7 + .../pulsar/broker/namespace/NamespaceService.java | 4 - .../pulsar/broker/service/BrokerService.java | 15 + .../extensions/ExtensibleLoadManagerImplTest.java | 309 ++++++++++++--------- .../channel/ServiceUnitStateChannelTest.java | 12 +- 7 files changed, 218 insertions(+), 148 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 85baf9ec4fb..d3119365ddf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -304,7 +304,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { } }); }); - this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); + this.serviceUnitStateChannel = ServiceUnitStateChannelImpl.newInstance(pulsar); this.brokerRegistry.start(); this.splitManager = new SplitManager(splitCounter); this.unloadManager = new UnloadManager(unloadCounter); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index f7e09a2bec5..3cf16709cde 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -200,7 +200,18 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { Unstable } + public static ServiceUnitStateChannelImpl newInstance(PulsarService pulsar) { + return new ServiceUnitStateChannelImpl(pulsar); + } + public ServiceUnitStateChannelImpl(PulsarService pulsar) { + this(pulsar, MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS, OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS); + } + + @VisibleForTesting + public ServiceUnitStateChannelImpl(PulsarService pulsar, + long inFlightStateWaitingTimeInMillis, + long ownershipMonitorDelayTimeInSecs) { this.pulsar = pulsar; this.config = pulsar.getConfig(); this.lookupServiceAddress = pulsar.getLookupServiceAddress(); @@ -210,8 +221,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { this.stateChangeListeners = new StateChangeListeners(); this.semiTerminalStateWaitingTimeInMillis = config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds() * 1000; - this.inFlightStateWaitingTimeInMillis = MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS; - this.ownershipMonitorDelayTimeInSecs = OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS; + this.inFlightStateWaitingTimeInMillis = inFlightStateWaitingTimeInMillis; + this.ownershipMonitorDelayTimeInSecs = ownershipMonitorDelayTimeInSecs; if (semiTerminalStateWaitingTimeInMillis < inFlightStateWaitingTimeInMillis) { throw new IllegalArgumentException( "Invalid Config: loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds < " @@ -837,7 +848,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { } finally { var future = requested.getValue(); if (future != null) { - future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS) + future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000, TimeUnit.MILLISECONDS) .whenComplete((v, e) -> { if (e != null) { getOwnerRequests.remove(serviceUnit, future); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java index 2dde0c4708e..ffdbbc2af42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java @@ -88,6 +88,13 @@ public class UnloadManager implements StateChangeListener { @Override public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { + if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) { + if (log.isDebugEnabled()) { + log.debug("Handling {} for service unit {} with exception.", data, serviceUnit, t); + } + this.complete(serviceUnit, t); + return; + } ServiceUnitState state = ServiceUnitStateData.state(data); switch (state) { case Free, Owned -> this.complete(serviceUnit, t); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 57c0cc7c046..c69e30173aa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1558,10 +1558,6 @@ public class NamespaceService implements AutoCloseable { public CompletableFuture<Boolean> checkOwnershipPresentAsync(NamespaceBundle bundle) { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { - if (bundle.getNamespaceObject().equals(SYSTEM_NAMESPACE)) { - return FutureUtil.failedFuture(new UnsupportedOperationException( - "Ownership check for system namespace is not supported")); - } ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle) .thenApply(Optional::isPresent); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6a61e851fbc..3c0ea0f8fc4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2193,6 +2193,21 @@ public class BrokerService implements Closeable { if (serviceUnit.includes(topicName)) { // Topic needs to be unloaded log.info("[{}] Unloading topic", topicName); + if (topicFuture.isCompletedExceptionally()) { + try { + topicFuture.get(); + } catch (InterruptedException | ExecutionException ex) { + if (ex.getCause() instanceof ServiceUnitNotReadyException) { + // Topic was already unloaded + if (log.isDebugEnabled()) { + log.debug("[{}] Topic was already unloaded", topicName); + } + return; + } else { + log.warn("[{}] Got exception when closing topic", topicName, ex); + } + } + } closeFutures.add(topicFuture .thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect) : CompletableFuture.completedFuture(null))); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 011e7174cbe..20ba9500cb1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -38,9 +38,11 @@ import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespace; import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespaceV2; import static org.apache.pulsar.broker.namespace.NamespaceService.getSLAMonitorNamespace; +import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -113,6 +115,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; import org.awaitility.Awaitility; +import org.mockito.MockedStatic; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -142,46 +145,56 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { @BeforeClass @Override public void setup() throws Exception { - conf.setForceDeleteNamespaceAllowed(true); - conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); - conf.setAllowAutoTopicCreation(true); - conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); - conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); - conf.setLoadBalancerSheddingEnabled(false); - conf.setLoadBalancerDebugModeEnabled(true); - conf.setTopicLevelPoliciesEnabled(false); - super.internalSetup(conf); - pulsar1 = pulsar; - ServiceConfiguration defaultConf = getDefaultConf(); - defaultConf.setAllowAutoTopicCreation(true); - defaultConf.setForceDeleteNamespaceAllowed(true); - defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); - defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); - defaultConf.setLoadBalancerSheddingEnabled(false); - defaultConf.setTopicLevelPoliciesEnabled(false); - additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf); - pulsar2 = additionalPulsarTestContext.getPulsarService(); - - setPrimaryLoadManager(); - - setSecondaryLoadManager(); - - admin.clusters().createCluster(this.conf.getClusterName(), - ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); - admin.tenants().createTenant("public", - new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), - Sets.newHashSet(this.conf.getClusterName()))); - admin.namespaces().createNamespace("public/default"); - admin.namespaces().setNamespaceReplicationClusters("public/default", - Sets.newHashSet(this.conf.getClusterName())); - - admin.namespaces().createNamespace(defaultTestNamespace); - admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace, - Sets.newHashSet(this.conf.getClusterName())); + try (MockedStatic<ServiceUnitStateChannelImpl> channelMockedStatic = + mockStatic(ServiceUnitStateChannelImpl.class)) { + channelMockedStatic.when(() -> ServiceUnitStateChannelImpl.newInstance(isA(PulsarService.class))) + .thenAnswer(invocation -> { + PulsarService pulsarService = invocation.getArgument(0); + // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid + // stuck when doing unload. + return new ServiceUnitStateChannelImpl(pulsarService, 5 * 1000, 1); + }); + conf.setForceDeleteNamespaceAllowed(true); + conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); + conf.setAllowAutoTopicCreation(true); + conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); + conf.setLoadBalancerSheddingEnabled(false); + conf.setLoadBalancerDebugModeEnabled(true); + conf.setTopicLevelPoliciesEnabled(true); + super.internalSetup(conf); + pulsar1 = pulsar; + ServiceConfiguration defaultConf = getDefaultConf(); + defaultConf.setAllowAutoTopicCreation(true); + defaultConf.setForceDeleteNamespaceAllowed(true); + defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); + defaultConf.setLoadBalancerSheddingEnabled(false); + defaultConf.setTopicLevelPoliciesEnabled(true); + additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf); + pulsar2 = additionalPulsarTestContext.getPulsarService(); + + setPrimaryLoadManager(); + + setSecondaryLoadManager(); + + admin.clusters().createCluster(this.conf.getClusterName(), + ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant("public", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), + Sets.newHashSet(this.conf.getClusterName()))); + admin.namespaces().createNamespace("public/default"); + admin.namespaces().setNamespaceReplicationClusters("public/default", + Sets.newHashSet(this.conf.getClusterName())); + + admin.namespaces().createNamespace(defaultTestNamespace); + admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace, + Sets.newHashSet(this.conf.getClusterName())); + } } @Override - @AfterClass + @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { pulsar1 = null; pulsar2.close(); @@ -557,119 +570,134 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { @Test public void testDeployAndRollbackLoadManager() throws Exception { - // Test rollback to modular load manager. - ServiceConfiguration defaultConf = getDefaultConf(); - defaultConf.setAllowAutoTopicCreation(true); - defaultConf.setForceDeleteNamespaceAllowed(true); - defaultConf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); - defaultConf.setLoadBalancerSheddingEnabled(false); - try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) { - // start pulsar3 with old load manager - var pulsar3 = additionalPulsarTestContext.getPulsarService(); - String topic = "persistent://" + defaultTestNamespace + "/test"; - - String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic); - assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl()); - - String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic); - String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic); - assertEquals(lookupResult1, lookupResult2); - assertEquals(lookupResult1, lookupResult3); - - NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get(); - LookupOptions options = LookupOptions.builder() - .authoritative(false) - .requestHttps(false) - .readOnly(false) - .loadTopicsInBundle(false).build(); - Optional<URL> webServiceUrl1 = - pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webServiceUrl1.isPresent()); - assertEquals(webServiceUrl1.get().toString(), pulsar3.getWebServiceAddress()); - - Optional<URL> webServiceUrl2 = - pulsar2.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webServiceUrl2.isPresent()); - assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString()); - - Optional<URL> webServiceUrl3 = - pulsar3.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webServiceUrl3.isPresent()); - assertEquals(webServiceUrl3.get().toString(), webServiceUrl1.get().toString()); - - List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, pulsar3); - for (PulsarService pulsarService : pulsarServices) { - // Test lookup heartbeat namespace's topic - for (PulsarService pulsar : pulsarServices) { - assertLookupHeartbeatOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl()); - } - // Test lookup SLA namespace's topic - for (PulsarService pulsar : pulsarServices) { - assertLookupSLANamespaceOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl()); - } - } - - // Test deploy new broker with new load manager - ServiceConfiguration conf = getDefaultConf(); - conf.setAllowAutoTopicCreation(true); - conf.setForceDeleteNamespaceAllowed(true); - conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); - conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); - try (var additionPulsarTestContext = createAdditionalPulsarTestContext(conf)) { - var pulsar4 = additionPulsarTestContext.getPulsarService(); - - Set<String> availableCandidates = Sets.newHashSet(pulsar1.getBrokerServiceUrl(), - pulsar2.getBrokerServiceUrl(), - pulsar4.getBrokerServiceUrl()); - String lookupResult4 = pulsar4.getAdminClient().lookups().lookupTopic(topic); - assertTrue(availableCandidates.contains(lookupResult4)); - - String lookupResult5 = pulsar1.getAdminClient().lookups().lookupTopic(topic); - String lookupResult6 = pulsar2.getAdminClient().lookups().lookupTopic(topic); - String lookupResult7 = pulsar3.getAdminClient().lookups().lookupTopic(topic); - assertEquals(lookupResult4, lookupResult5); - assertEquals(lookupResult4, lookupResult6); - assertEquals(lookupResult4, lookupResult7); - - Set<String> availableWebUrlCandidates = Sets.newHashSet(pulsar1.getWebServiceAddress(), - pulsar2.getWebServiceAddress(), - pulsar4.getWebServiceAddress()); - - webServiceUrl1 = + try (MockedStatic<ServiceUnitStateChannelImpl> channelMockedStatic = + mockStatic(ServiceUnitStateChannelImpl.class)) { + channelMockedStatic.when(() -> ServiceUnitStateChannelImpl.newInstance(isA(PulsarService.class))) + .thenAnswer(invocation -> { + PulsarService pulsarService = invocation.getArgument(0); + // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid + // stuck when doing unload. + return new ServiceUnitStateChannelImpl(pulsarService, 5 * 1000, 1); + }); + // Test rollback to modular load manager. + ServiceConfiguration defaultConf = getDefaultConf(); + defaultConf.setAllowAutoTopicCreation(true); + defaultConf.setForceDeleteNamespaceAllowed(true); + defaultConf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + defaultConf.setLoadBalancerSheddingEnabled(false); + try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) { + // start pulsar3 with old load manager + var pulsar3 = additionalPulsarTestContext.getPulsarService(); + String topic = "persistent://" + defaultTestNamespace + "/test"; + + String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic); + assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl()); + + String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic); + String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic); + assertEquals(lookupResult1, lookupResult2); + assertEquals(lookupResult1, lookupResult3); + + NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get(); + LookupOptions options = LookupOptions.builder() + .authoritative(false) + .requestHttps(false) + .readOnly(false) + .loadTopicsInBundle(false).build(); + Optional<URL> webServiceUrl1 = pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); assertTrue(webServiceUrl1.isPresent()); - assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString())); + assertEquals(webServiceUrl1.get().toString(), pulsar3.getWebServiceAddress()); - webServiceUrl2 = + Optional<URL> webServiceUrl2 = pulsar2.getNamespaceService().getWebServiceUrl(bundle, options); assertTrue(webServiceUrl2.isPresent()); assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString()); - // The pulsar3 will redirect to pulsar4 - webServiceUrl3 = + Optional<URL> webServiceUrl3 = pulsar3.getNamespaceService().getWebServiceUrl(bundle, options); assertTrue(webServiceUrl3.isPresent()); - // It will redirect to pulsar4 - assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString())); - - var webServiceUrl4 = - pulsar4.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webServiceUrl4.isPresent()); - assertEquals(webServiceUrl4.get().toString(), webServiceUrl1.get().toString()); + assertEquals(webServiceUrl3.get().toString(), webServiceUrl1.get().toString()); - pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4); + List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, pulsar3); for (PulsarService pulsarService : pulsarServices) { // Test lookup heartbeat namespace's topic for (PulsarService pulsar : pulsarServices) { - assertLookupHeartbeatOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl()); + assertLookupHeartbeatOwner(pulsarService, + pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl()); } // Test lookup SLA namespace's topic for (PulsarService pulsar : pulsarServices) { - assertLookupSLANamespaceOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl()); + assertLookupSLANamespaceOwner(pulsarService, + pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl()); + } + } + + // Test deploy new broker with new load manager + ServiceConfiguration conf = getDefaultConf(); + conf.setAllowAutoTopicCreation(true); + conf.setForceDeleteNamespaceAllowed(true); + conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); + try (var additionPulsarTestContext = createAdditionalPulsarTestContext(conf)) { + var pulsar4 = additionPulsarTestContext.getPulsarService(); + + Set<String> availableCandidates = Sets.newHashSet(pulsar1.getBrokerServiceUrl(), + pulsar2.getBrokerServiceUrl(), + pulsar4.getBrokerServiceUrl()); + String lookupResult4 = pulsar4.getAdminClient().lookups().lookupTopic(topic); + assertTrue(availableCandidates.contains(lookupResult4)); + + String lookupResult5 = pulsar1.getAdminClient().lookups().lookupTopic(topic); + String lookupResult6 = pulsar2.getAdminClient().lookups().lookupTopic(topic); + String lookupResult7 = pulsar3.getAdminClient().lookups().lookupTopic(topic); + assertEquals(lookupResult4, lookupResult5); + assertEquals(lookupResult4, lookupResult6); + assertEquals(lookupResult4, lookupResult7); + + Set<String> availableWebUrlCandidates = Sets.newHashSet(pulsar1.getWebServiceAddress(), + pulsar2.getWebServiceAddress(), + pulsar4.getWebServiceAddress()); + + webServiceUrl1 = + pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl1.isPresent()); + assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString())); + + webServiceUrl2 = + pulsar2.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl2.isPresent()); + assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString()); + + // The pulsar3 will redirect to pulsar4 + webServiceUrl3 = + pulsar3.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl3.isPresent()); + // It will redirect to pulsar4 + assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString())); + + var webServiceUrl4 = + pulsar4.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl4.isPresent()); + assertEquals(webServiceUrl4.get().toString(), webServiceUrl1.get().toString()); + + pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4); + for (PulsarService pulsarService : pulsarServices) { + // Test lookup heartbeat namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupHeartbeatOwner(pulsarService, + pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl()); + } + // Test lookup SLA namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupSLANamespaceOwner(pulsarService, + pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl()); + } } } } } + } private void assertLookupHeartbeatOwner(PulsarService pulsar, @@ -1108,6 +1136,12 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { NamespaceName heartbeatNamespacePulsar2V2 = NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration()); + NamespaceName slaMonitorNamespacePulsar1 = + getSLAMonitorNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration()); + + NamespaceName slaMonitorNamespacePulsar2 = + getSLAMonitorNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration()); + NamespaceBundle bundle1 = pulsar1.getNamespaceService().getNamespaceBundleFactory() .getFullBundle(heartbeatNamespacePulsar1V1); NamespaceBundle bundle2 = pulsar1.getNamespaceService().getNamespaceBundleFactory() @@ -1118,27 +1152,34 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { NamespaceBundle bundle4 = pulsar2.getNamespaceService().getNamespaceBundleFactory() .getFullBundle(heartbeatNamespacePulsar2V2); + NamespaceBundle slaBundle1 = pulsar1.getNamespaceService().getNamespaceBundleFactory() + .getFullBundle(slaMonitorNamespacePulsar1); + NamespaceBundle slaBundle2 = pulsar2.getNamespaceService().getNamespaceBundleFactory() + .getFullBundle(slaMonitorNamespacePulsar2); + + Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits(); log.info("Owned service units: {}", ownedServiceUnitsByPulsar1); // heartbeat namespace bundle will own by pulsar1 - assertEquals(ownedServiceUnitsByPulsar1.size(), 3); assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1)); assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2)); + assertTrue(ownedServiceUnitsByPulsar1.contains(slaBundle1)); Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits(); log.info("Owned service units: {}", ownedServiceUnitsByPulsar2); - assertEquals(ownedServiceUnitsByPulsar2.size(), 3); assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3)); assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4)); + assertTrue(ownedServiceUnitsByPulsar2.contains(slaBundle2)); Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 = admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress()); Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 = admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress()); - assertEquals(ownedNamespacesByPulsar1.size(), 3); assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString())); assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString())); - assertEquals(ownedNamespacesByPulsar2.size(), 3); + assertTrue(ownedNamespacesByPulsar1.containsKey(slaBundle1.toString())); + assertTrue(ownedNamespacesByPulsar2.containsKey(bundle3.toString())); assertTrue(ownedNamespacesByPulsar2.containsKey(bundle4.toString())); + assertTrue(ownedNamespacesByPulsar2.containsKey(slaBundle2.toString())); String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units"; admin.topics().createPartitionedTopic(topic, 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index f9893ea3f63..1da7059bf49 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -507,10 +507,10 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest { assertEquals(1, getOwnerRequests1.size()); assertEquals(1, getOwnerRequests2.size()); - // In 5 secs, the getOwnerAsync requests(lookup requests) should time out. - Awaitility.await().atMost(5, TimeUnit.SECONDS) + // In 10 secs, the getOwnerAsync requests(lookup requests) should time out. + Awaitility.await().atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> assertTrue(owner1.isCompletedExceptionally())); - Awaitility.await().atMost(5, TimeUnit.SECONDS) + Awaitility.await().atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> assertTrue(owner2.isCompletedExceptionally())); assertEquals(0, getOwnerRequests1.size()); @@ -1139,10 +1139,10 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest { assertFalse(owner1.isDone()); assertFalse(owner2.isDone()); - // In 5 secs, the getOwnerAsync requests(lookup requests) should time out. - Awaitility.await().atMost(5, TimeUnit.SECONDS) + // In 10 secs, the getOwnerAsync requests(lookup requests) should time out. + Awaitility.await().atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> assertTrue(owner1.isCompletedExceptionally())); - Awaitility.await().atMost(5, TimeUnit.SECONDS) + Awaitility.await().atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> assertTrue(owner2.isCompletedExceptionally())); // recovered, check the monitor update state : Assigned -> Owned
