This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f77e1553ae3d9d4e735b5474025b582c3ca609f2
Author: Kai Wang <[email protected]>
AuthorDate: Thu Apr 18 09:48:14 2024 +0800

    [fix][broker] Check the broker is available for the SLA monitor bundle when 
the ExtensibleLoadManager is enabled (#22485)
    
    (cherry picked from commit d0b9d471d53d2db600b55a04d6255688d1fd2d27)
---
 .../extensions/ExtensibleLoadManagerImpl.java      | 39 +++++++-----------
 .../pulsar/broker/namespace/NamespaceService.java  | 47 +++++++++++++++++-----
 .../extensions/ExtensibleLoadManagerImplTest.java  | 43 ++++++++++++++++++++
 3 files changed, 94 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 2a644a20a46..677ca2ddae0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -506,30 +506,20 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
             if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
                 owner = serviceUnitStateChannel.getChannelOwnerAsync();
             } else {
-                String candidateBrokerId = 
getHeartbeatOrSLAMonitorBrokerId(serviceUnit);
-                if (candidateBrokerId != null) {
-                    owner = 
CompletableFuture.completedFuture(Optional.of(candidateBrokerId));
-                } else {
-                    owner = getOrSelectOwnerAsync(serviceUnit, 
bundle).thenApply(Optional::ofNullable);
-                }
+                owner = 
getHeartbeatOrSLAMonitorBrokerId(serviceUnit).thenCompose(candidateBrokerId -> {
+                    if (candidateBrokerId != null) {
+                        return 
CompletableFuture.completedFuture(Optional.of(candidateBrokerId));
+                    }
+                    return getOrSelectOwnerAsync(serviceUnit, 
bundle).thenApply(Optional::ofNullable);
+                });
             }
             return getBrokerLookupData(owner, bundle);
         });
     }
 
-    private String getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) 
{
-        // Check if this is Heartbeat or SLAMonitor namespace
-        String candidateBroker = 
NamespaceService.checkHeartbeatNamespace(serviceUnit);
-        if (candidateBroker == null) {
-            candidateBroker = 
NamespaceService.checkHeartbeatNamespaceV2(serviceUnit);
-        }
-        if (candidateBroker == null) {
-            candidateBroker = 
NamespaceService.getSLAMonitorBrokerName(serviceUnit);
-        }
-        if (candidateBroker != null) {
-            return candidateBroker.substring(candidateBroker.lastIndexOf('/') 
+ 1);
-        }
-        return candidateBroker;
+    private CompletableFuture<String> 
getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit) {
+        return 
pulsar.getNamespaceService().getHeartbeatOrSLAMonitorBrokerId(serviceUnit,
+                cb -> 
brokerRegistry.lookupAsync(cb).thenApply(Optional::isPresent));
     }
 
     private CompletableFuture<String> getOrSelectOwnerAsync(ServiceUnitId 
serviceUnit,
@@ -676,11 +666,12 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
             return serviceUnitStateChannel.getChannelOwnerAsync();
         }
-        String candidateBroker = getHeartbeatOrSLAMonitorBrokerId(serviceUnit);
-        if (candidateBroker != null) {
-            return 
CompletableFuture.completedFuture(Optional.of(candidateBroker));
-        }
-        return serviceUnitStateChannel.getOwnerAsync(bundle);
+        return 
getHeartbeatOrSLAMonitorBrokerId(serviceUnit).thenCompose(candidateBroker -> {
+            if (candidateBroker != null) {
+                return 
CompletableFuture.completedFuture(Optional.of(candidateBroker));
+            }
+            return serviceUnitStateChannel.getOwnerAsync(bundle);
+        });
     }
 
     public CompletableFuture<Optional<BrokerLookupData>> 
getOwnershipWithLookupDataAsync(ServiceUnitId bundleUnit) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 622331e8fbe..2f75e364ea5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -496,6 +497,38 @@ public class NamespaceService implements AutoCloseable {
         });
     }
 
+    /**
+     * Check if this is Heartbeat or SLAMonitor namespace and return the 
broker id.
+     *
+     * @param serviceUnit the service unit
+     * @param isBrokerActive the function to check if the broker is active
+     * @return the broker id
+     */
+    public CompletableFuture<String> getHeartbeatOrSLAMonitorBrokerId(
+            ServiceUnitId serviceUnit, Function<String, 
CompletableFuture<Boolean>> isBrokerActive) {
+        String candidateBroker = 
NamespaceService.checkHeartbeatNamespace(serviceUnit);
+        if (candidateBroker != null) {
+            return CompletableFuture.completedFuture(candidateBroker);
+        }
+        candidateBroker = 
NamespaceService.checkHeartbeatNamespaceV2(serviceUnit);
+        if (candidateBroker != null) {
+            return CompletableFuture.completedFuture(candidateBroker);
+        }
+        candidateBroker = 
NamespaceService.getSLAMonitorBrokerName(serviceUnit);
+        if (candidateBroker != null) {
+            // Check if the broker is available
+            final String finalCandidateBroker = candidateBroker;
+            return isBrokerActive.apply(candidateBroker).thenApply(isActive -> 
{
+                if (isActive) {
+                    return finalCandidateBroker;
+                } else {
+                    return null;
+                }
+            });
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
     private void searchForCandidateBroker(NamespaceBundle bundle,
                                           
CompletableFuture<Optional<LookupResult>> lookupFuture,
                                           LookupOptions options) {
@@ -523,17 +556,9 @@ public class NamespaceService implements AutoCloseable {
 
         try {
             // check if this is Heartbeat or SLAMonitor namespace
-            candidateBroker = checkHeartbeatNamespace(bundle);
-            if (candidateBroker == null) {
-                candidateBroker = checkHeartbeatNamespaceV2(bundle);
-            }
-            if (candidateBroker == null) {
-                String broker = getSLAMonitorBrokerName(bundle);
-                // checking if the broker is up and running
-                if (broker != null && isBrokerActive(broker)) {
-                    candidateBroker = broker;
-                }
-            }
+            candidateBroker = getHeartbeatOrSLAMonitorBrokerId(bundle, cb ->
+                    CompletableFuture.completedFuture(isBrokerActive(cb)))
+                    .get(config.getMetadataStoreOperationTimeoutSeconds(), 
SECONDS);
 
             if (candidateBroker == null) {
                 Optional<LeaderBroker> currentLeader = 
pulsar.getLeaderElectionService().getCurrentLeader();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index c85eeed0e22..f9e2eb5f9de 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -50,6 +50,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -113,6 +114,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -982,6 +984,47 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                                     pulsar.getBrokerId(), 
pulsar.getBrokerServiceUrl());
                         }
                     }
+                    // Check if the broker is available
+                    var wrapper = (ExtensibleLoadManagerWrapper) 
pulsar4.getLoadManager().get();
+                    var loadManager4 = spy((ExtensibleLoadManagerImpl)
+                            FieldUtils.readField(wrapper, "loadManager", 
true));
+                    loadManager4.getBrokerRegistry().unregister();
+
+                    NamespaceName slaMonitorNamespace =
+                            getSLAMonitorNamespace(pulsar4.getBrokerId(), 
pulsar.getConfiguration());
+                    String slaMonitorTopic = 
slaMonitorNamespace.getPersistentTopicName("test");
+                    String result = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+                    assertNotNull(result);
+                    log.info("{} Namespace is re-owned by {}", 
slaMonitorTopic, result);
+                    assertNotEquals(result, pulsar4.getBrokerServiceUrl());
+
+                    Producer<String> producer = 
pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create();
+                    producer.send("t1");
+
+                    // Test re-register broker and check the lookup result
+                    loadManager4.getBrokerRegistry().register();
+
+                    result = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+                    assertNotNull(result);
+                    log.info("{} Namespace is re-owned by {}", 
slaMonitorTopic, result);
+                    assertEquals(result, pulsar4.getBrokerServiceUrl());
+
+                    producer.send("t2");
+                    Producer<String> producer1 = 
pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create();
+                    producer1.send("t3");
+
+                    producer.close();
+                    producer1.close();
+                    @Cleanup
+                    Consumer<String> consumer = 
pulsar.getClient().newConsumer(Schema.STRING)
+                            .topic(slaMonitorTopic)
+                            
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                            .subscriptionName("test")
+                            .subscribe();
+                    // receive message t1 t2 t3
+                    assertEquals(consumer.receive().getValue(), "t1");
+                    assertEquals(consumer.receive().getValue(), "t2");
+                    assertEquals(consumer.receive().getValue(), "t3");
                 }
             }
     }

Reply via email to