This is an automated email from the ASF dual-hosted git repository. heesung pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f77e1553ae3d9d4e735b5474025b582c3ca609f2 Author: Kai Wang <[email protected]> AuthorDate: Thu Apr 18 09:48:14 2024 +0800 [fix][broker] Check the broker is available for the SLA monitor bundle when the ExtensibleLoadManager is enabled (#22485) (cherry picked from commit d0b9d471d53d2db600b55a04d6255688d1fd2d27) --- .../extensions/ExtensibleLoadManagerImpl.java | 39 +++++++----------- .../pulsar/broker/namespace/NamespaceService.java | 47 +++++++++++++++++----- .../extensions/ExtensibleLoadManagerImplTest.java | 43 ++++++++++++++++++++ 3 files changed, 94 insertions(+), 35 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 2a644a20a46..677ca2ddae0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -506,30 +506,20 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { if (topic.isPresent() && isInternalTopic(topic.get().toString())) { owner = serviceUnitStateChannel.getChannelOwnerAsync(); } else { - String candidateBrokerId = getHeartbeatOrSLAMonitorBrokerId(serviceUnit); - if (candidateBrokerId != null) { - owner = CompletableFuture.completedFuture(Optional.of(candidateBrokerId)); - } else { - owner = getOrSelectOwnerAsync(serviceUnit, bundle).thenApply(Optional::ofNullable); - } + owner = getHeartbeatOrSLAMonitorBrokerId(serviceUnit).thenCompose(candidateBrokerId -> { + if (candidateBrokerId != null) { + return CompletableFuture.completedFuture(Optional.of(candidateBrokerId)); + } + return getOrSelectOwnerAsync(serviceUnit, bundle).thenApply(Optional::ofNullable); + }); } return getBrokerLookupData(owner, bundle); }); } - private String getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) { - // Check if this is Heartbeat or SLAMonitor namespace - String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit); - if (candidateBroker == null) { - candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit); - } - if (candidateBroker == null) { - candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit); - } - if (candidateBroker != null) { - return candidateBroker.substring(candidateBroker.lastIndexOf('/') + 1); - } - return candidateBroker; + private CompletableFuture<String> getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) { + return pulsar.getNamespaceService().getHeartbeatOrSLAMonitorBrokerId(serviceUnit, + cb -> brokerRegistry.lookupAsync(cb).thenApply(Optional::isPresent)); } private CompletableFuture<String> getOrSelectOwnerAsync(ServiceUnitId serviceUnit, @@ -676,11 +666,12 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { if (topic.isPresent() && isInternalTopic(topic.get().toString())) { return serviceUnitStateChannel.getChannelOwnerAsync(); } - String candidateBroker = getHeartbeatOrSLAMonitorBrokerId(serviceUnit); - if (candidateBroker != null) { - return CompletableFuture.completedFuture(Optional.of(candidateBroker)); - } - return serviceUnitStateChannel.getOwnerAsync(bundle); + return getHeartbeatOrSLAMonitorBrokerId(serviceUnit).thenCompose(candidateBroker -> { + if (candidateBroker != null) { + return CompletableFuture.completedFuture(Optional.of(candidateBroker)); + } + return serviceUnitStateChannel.getOwnerAsync(bundle); + }); } public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataAsync(ServiceUnitId bundleUnit) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 622331e8fbe..2f75e364ea5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -496,6 +497,38 @@ public class NamespaceService implements AutoCloseable { }); } + /** + * Check if this is Heartbeat or SLAMonitor namespace and return the broker id. + * + * @param serviceUnit the service unit + * @param isBrokerActive the function to check if the broker is active + * @return the broker id + */ + public CompletableFuture<String> getHeartbeatOrSLAMonitorBrokerId( + ServiceUnitId serviceUnit, Function<String, CompletableFuture<Boolean>> isBrokerActive) { + String candidateBroker = NamespaceService.checkHeartbeatNamespace(serviceUnit); + if (candidateBroker != null) { + return CompletableFuture.completedFuture(candidateBroker); + } + candidateBroker = NamespaceService.checkHeartbeatNamespaceV2(serviceUnit); + if (candidateBroker != null) { + return CompletableFuture.completedFuture(candidateBroker); + } + candidateBroker = NamespaceService.getSLAMonitorBrokerName(serviceUnit); + if (candidateBroker != null) { + // Check if the broker is available + final String finalCandidateBroker = candidateBroker; + return isBrokerActive.apply(candidateBroker).thenApply(isActive -> { + if (isActive) { + return finalCandidateBroker; + } else { + return null; + } + }); + } + return CompletableFuture.completedFuture(null); + } + private void searchForCandidateBroker(NamespaceBundle bundle, CompletableFuture<Optional<LookupResult>> lookupFuture, LookupOptions options) { @@ -523,17 +556,9 @@ public class NamespaceService implements AutoCloseable { try { // check if this is Heartbeat or SLAMonitor namespace - candidateBroker = checkHeartbeatNamespace(bundle); - if (candidateBroker == null) { - candidateBroker = checkHeartbeatNamespaceV2(bundle); - } - if (candidateBroker == null) { - String broker = getSLAMonitorBrokerName(bundle); - // checking if the broker is up and running - if (broker != null && isBrokerActive(broker)) { - candidateBroker = broker; - } - } + candidateBroker = getHeartbeatOrSLAMonitorBrokerId(bundle, cb -> + CompletableFuture.completedFuture(isBrokerActive(cb))) + .get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS); if (candidateBroker == null) { Optional<LeaderBroker> currentLeader = pulsar.getLeaderElectionService().getCurrentLeader(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index c85eeed0e22..f9e2eb5f9de 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -50,6 +50,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -113,6 +114,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; @@ -982,6 +984,47 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); } } + // Check if the broker is available + var wrapper = (ExtensibleLoadManagerWrapper) pulsar4.getLoadManager().get(); + var loadManager4 = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(wrapper, "loadManager", true)); + loadManager4.getBrokerRegistry().unregister(); + + NamespaceName slaMonitorNamespace = + getSLAMonitorNamespace(pulsar4.getBrokerId(), pulsar.getConfiguration()); + String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test"); + String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(result); + log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); + assertNotEquals(result, pulsar4.getBrokerServiceUrl()); + + Producer<String> producer = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); + producer.send("t1"); + + // Test re-register broker and check the lookup result + loadManager4.getBrokerRegistry().register(); + + result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(result); + log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); + assertEquals(result, pulsar4.getBrokerServiceUrl()); + + producer.send("t2"); + Producer<String> producer1 = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); + producer1.send("t3"); + + producer.close(); + producer1.close(); + @Cleanup + Consumer<String> consumer = pulsar.getClient().newConsumer(Schema.STRING) + .topic(slaMonitorTopic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("test") + .subscribe(); + // receive message t1 t2 t3 + assertEquals(consumer.receive().getValue(), "t1"); + assertEquals(consumer.receive().getValue(), "t2"); + assertEquals(consumer.receive().getValue(), "t3"); } } }
