This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new acb5a54f3f1 [fix][broker] Cleanup correctly heartbeat bundle ownership 
when handling broker deletion event (#21083)
acb5a54f3f1 is described below

commit acb5a54f3f194b7a07062100670e94c0330e2a21
Author: Kai Wang <[email protected]>
AuthorDate: Mon Sep 4 17:49:35 2023 +0800

    [fix][broker] Cleanup correctly heartbeat bundle ownership when handling 
broker deletion event (#21083)
---
 .../pulsar/broker/admin/impl/BrokersBase.java      |  5 +-
 .../channel/ServiceUnitStateChannelImpl.java       | 10 +--
 .../pulsar/broker/namespace/NamespaceService.java  | 38 ++++------
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  2 +-
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |  2 +-
 .../extensions/ExtensibleLoadManagerImplTest.java  |  8 +--
 .../channel/ServiceUnitStateChannelTest.java       | 81 +++++++++++++++++-----
 .../broker/namespace/NamespaceServiceTest.java     |  2 +-
 .../pulsar/broker/service/BrokerServiceTest.java   |  6 +-
 .../broker/service/InactiveTopicDeleteTest.java    |  6 +-
 .../systopic/PartitionedSystemTopicTest.java       |  6 +-
 .../apache/pulsar/compaction/CompactionTest.java   |  4 +-
 12 files changed, 106 insertions(+), 64 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index a61a2c940b8..d37a9eda1ab 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -397,9 +397,10 @@ public class BrokersBase extends AdminResource {
 
 
     private CompletableFuture<Void> internalRunHealthCheck(TopicVersion 
topicVersion) {
+        String lookupServiceAddress = pulsar().getLookupServiceAddress();
         NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
-                ? 
NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(), 
pulsar().getConfiguration())
-                : 
NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(), 
pulsar().getConfiguration());
+                ? 
NamespaceService.getHeartbeatNamespaceV2(lookupServiceAddress, 
pulsar().getConfiguration())
+                : NamespaceService.getHeartbeatNamespace(lookupServiceAddress, 
pulsar().getConfiguration());
         final String topicName = String.format("persistent://%s/%s", 
namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
         LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), 
topicName);
         final String messageStr = UUID.randomUUID().toString();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 4eb25848fda..e9fbf625edd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -41,6 +41,8 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
+import static 
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT;
+import static 
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2;
 import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
 import static 
org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
 import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
@@ -92,6 +94,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceBundles;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.stats.Metrics;
@@ -1213,10 +1216,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         int orphanServiceUnitCleanupCnt = 0;
         long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
         String heartbeatNamespace =
-                
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), 
pulsar.getConfiguration())
-                        .toString();
-        String heartbeatNamespaceV2 = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
-                pulsar.getConfiguration()).toString();
+                NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT, 
config.getClusterName(), broker)).toString();
+        String heartbeatNamespaceV2 =
+                NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, 
broker)).toString();
 
         Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new 
HashMap<>();
         for (var etr : tableview.entrySet()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 141e48515b2..abf96361408 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -132,8 +132,8 @@ public class NamespaceService implements AutoCloseable {
     public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = 
Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
     public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = 
Pattern.compile("pulsar/([^:]+:\\d+)");
     public static final Pattern SLA_NAMESPACE_PATTERN = 
Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
-    public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s";
-    public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s:%s";
+    public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s";
+    public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s";
     public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + 
"/%s/%s:%s";
 
     private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> 
namespaceClients;
@@ -159,7 +159,7 @@ public class NamespaceService implements AutoCloseable {
      */
     public NamespaceService(PulsarService pulsar) {
         this.pulsar = pulsar;
-        host = pulsar.getAdvertisedAddress();
+        this.host = pulsar.getAdvertisedAddress();
         this.config = pulsar.getConfiguration();
         this.loadManager = pulsar.getLoadManager();
         this.bundleFactory = new NamespaceBundleFactory(pulsar, 
Hashing.crc32());
@@ -327,15 +327,17 @@ public class NamespaceService implements AutoCloseable {
      * @throws PulsarServerException
      */
     public void registerBootstrapNamespaces() throws PulsarServerException {
-
+        String lookupServiceAddress = pulsar.getLookupServiceAddress();
         // ensure that we own the heartbeat namespace
-        if (registerNamespace(getHeartbeatNamespace(host, config), true)) {
-            LOG.info("added heartbeat namespace name in local cache: ns={}", 
getHeartbeatNamespace(host, config));
+        if (registerNamespace(getHeartbeatNamespace(lookupServiceAddress, 
config), true)) {
+            LOG.info("added heartbeat namespace name in local cache: ns={}",
+                    getHeartbeatNamespace(lookupServiceAddress, config));
         }
 
         // ensure that we own the heartbeat namespace
-        if (registerNamespace(getHeartbeatNamespaceV2(host, config), true)) {
-            LOG.info("added heartbeat namespace name in local cache: ns={}", 
getHeartbeatNamespaceV2(host, config));
+        if (registerNamespace(getHeartbeatNamespaceV2(lookupServiceAddress, 
config), true)) {
+            LOG.info("added heartbeat namespace name in local cache: ns={}",
+                    getHeartbeatNamespaceV2(lookupServiceAddress, config));
         }
 
         // we may not need strict ownership checking for bootstrap names for 
now
@@ -1566,24 +1568,12 @@ public class NamespaceService implements AutoCloseable {
         LOG.info("Namespace {} unloaded successfully", namespaceName);
     }
 
-    public static NamespaceName getHeartbeatNamespace(String host, 
ServiceConfiguration config) {
-        Integer port = null;
-        if (config.getWebServicePort().isPresent()) {
-            port = config.getWebServicePort().get();
-        } else if (config.getWebServicePortTls().isPresent()) {
-            port = config.getWebServicePortTls().get();
-        }
-        return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT, 
config.getClusterName(), host, port));
+    public static NamespaceName getHeartbeatNamespace(String lookupBroker, 
ServiceConfiguration config) {
+        return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT, 
config.getClusterName(), lookupBroker));
     }
 
-    public static NamespaceName getHeartbeatNamespaceV2(String host, 
ServiceConfiguration config) {
-        Integer port = null;
-        if (config.getWebServicePort().isPresent()) {
-            port = config.getWebServicePort().get();
-        } else if (config.getWebServicePortTls().isPresent()) {
-            port = config.getWebServicePortTls().get();
-        }
-        return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, 
host, port));
+    public static NamespaceName getHeartbeatNamespaceV2(String lookupBroker, 
ServiceConfiguration config) {
+        return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, 
lookupBroker));
     }
 
     public static NamespaceName getSLAMonitorNamespace(String host, 
ServiceConfiguration config) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index f9ca8cae625..83f27c3e9d8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -536,7 +536,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         for (String ns : nsMap.keySet()) {
             NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
             if (ns.equals(
-                    
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), 
pulsar.getConfiguration())
+                    
NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), 
pulsar.getConfiguration())
                             + "/0x00000000_0xffffffff")) {
                 assertEquals(nsStatus.broker_assignment, 
BrokerAssignment.shared);
                 assertFalse(nsStatus.is_controlled);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index b2052cdcbf0..f8614631a9f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -452,7 +452,7 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
         for (String ns : nsMap.keySet()) {
             NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
             if (ns.equals(
-                    
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), 
pulsar.getConfiguration())
+                    
NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), 
pulsar.getConfiguration())
                             + "/0x00000000_0xffffffff")) {
                 assertEquals(nsStatus.broker_assignment, 
BrokerAssignment.shared);
                 assertFalse(nsStatus.is_controlled);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 410c10eeac7..76202b1b0ae 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1027,14 +1027,14 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
     @Test(timeOut = 30 * 1000)
     public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws 
Exception {
         NamespaceName heartbeatNamespacePulsar1V1 =
-                
NamespaceService.getHeartbeatNamespace(pulsar1.getAdvertisedAddress(), 
pulsar1.getConfiguration());
+                
NamespaceService.getHeartbeatNamespace(pulsar1.getLookupServiceAddress(), 
pulsar1.getConfiguration());
         NamespaceName heartbeatNamespacePulsar1V2 =
-                
NamespaceService.getHeartbeatNamespaceV2(pulsar1.getAdvertisedAddress(), 
pulsar1.getConfiguration());
+                
NamespaceService.getHeartbeatNamespaceV2(pulsar1.getLookupServiceAddress(), 
pulsar1.getConfiguration());
 
         NamespaceName heartbeatNamespacePulsar2V1 =
-                
NamespaceService.getHeartbeatNamespace(pulsar2.getAdvertisedAddress(), 
pulsar2.getConfiguration());
+                
NamespaceService.getHeartbeatNamespace(pulsar2.getLookupServiceAddress(), 
pulsar2.getConfiguration());
         NamespaceName heartbeatNamespacePulsar2V2 =
-                
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getAdvertisedAddress(), 
pulsar2.getConfiguration());
+                
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(), 
pulsar2.getConfiguration());
 
         NamespaceBundle bundle1 = 
pulsar1.getNamespaceService().getNamespaceBundleFactory()
                 .getFullBundle(heartbeatNamespacePulsar1V1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index d8b4f1d75d2..caf7f0d5d5e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -30,6 +30,8 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
+import static 
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT;
+import static 
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2;
 import static 
org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost;
 import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
 import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
@@ -87,6 +89,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.TableViewImpl;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -747,11 +750,41 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         String broker = lookupServiceAddress1;
         channel1.publishAssignEventAsync(bundle1, broker);
         channel2.publishAssignEventAsync(bundle2, broker);
+
         waitUntilNewOwner(channel1, bundle1, broker);
         waitUntilNewOwner(channel2, bundle1, broker);
         waitUntilNewOwner(channel1, bundle2, broker);
         waitUntilNewOwner(channel2, bundle2, broker);
 
+        // Register the broker-1 heartbeat namespace bundle.
+        String heartbeatNamespaceBroker1V1 = NamespaceName
+                .get(String.format(HEARTBEAT_NAMESPACE_FMT, 
conf.getClusterName(), broker)).toString();
+        String heartbeatNamespaceBroker1V2 = NamespaceName
+                .get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, 
broker)).toString();
+        String heartbeatNamespaceBroker1V1Bundle = heartbeatNamespaceBroker1V1 
+ "/0x00000000_0xfffffff0";
+        String heartbeatNamespaceBroker1V2Bundle = heartbeatNamespaceBroker1V2 
+ "/0x00000000_0xfffffff0";
+        channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V1Bundle, 
broker);
+        channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V2Bundle, 
broker);
+
+        // Register the broker-2 heartbeat namespace bundle.
+        String heartbeatNamespaceBroker2V1 = NamespaceName
+                .get(String.format(HEARTBEAT_NAMESPACE_FMT, 
conf.getClusterName(), lookupServiceAddress2)).toString();
+        String heartbeatNamespaceBroker2V2 = NamespaceName
+                .get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, 
lookupServiceAddress2)).toString();
+        String heartbeatNamespaceBroker2V1Bundle = heartbeatNamespaceBroker2V1 
+ "/0x00000000_0xfffffff0";
+        String heartbeatNamespaceBroker2V2Bundle = heartbeatNamespaceBroker2V2 
+ "/0x00000000_0xfffffff0";
+        channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V1Bundle, 
lookupServiceAddress2);
+        channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V2Bundle, 
lookupServiceAddress2);
+        waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, broker);
+        waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, broker);
+        waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, broker);
+        waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, broker);
+        waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle, 
lookupServiceAddress2);
+        waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle, 
lookupServiceAddress2);
+        waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle, 
lookupServiceAddress2);
+        waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle, 
lookupServiceAddress2);
+
+        // Verify to transfer the ownership to the other broker.
         channel1.publishUnloadEventAsync(new Unload(broker, bundle1, 
Optional.of(lookupServiceAddress2)));
         waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
         waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
@@ -765,12 +798,24 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS 
* 1000 + 1000), true);
         leaderChannel.handleBrokerRegistrationEvent(broker, 
NotificationType.Deleted);
         followerChannel.handleBrokerRegistrationEvent(broker, 
NotificationType.Deleted);
+        leaderChannel.handleBrokerRegistrationEvent(lookupServiceAddress2, 
NotificationType.Deleted);
+        followerChannel.handleBrokerRegistrationEvent(lookupServiceAddress2, 
NotificationType.Deleted);
 
         waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
         waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
         waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2);
         waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2);
 
+        waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, null);
+        waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, null);
+        waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, null);
+        waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, null);
+
+        waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle, null);
+        waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle, null);
+        waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle, null);
+        waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle, null);
+
         verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any());
         verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), 
any());
 
@@ -780,11 +825,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         });
 
         validateMonitorCounters(leaderChannel,
-                1,
+                2,
                 0,
-                1,
+                7,
                 0,
-                1,
+                2,
                 0,
                 0);
 
@@ -811,11 +856,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         });
 
         validateMonitorCounters(leaderChannel,
-                1,
+                2,
                 0,
-                1,
+                7,
                 0,
-                2,
+                3,
                 0,
                 0);
 
@@ -832,11 +877,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         });
 
         validateMonitorCounters(leaderChannel,
-                1,
+                2,
                 0,
-                1,
+                7,
                 0,
-                2,
+                3,
                 0,
                 1);
 
@@ -854,11 +899,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         });
 
         validateMonitorCounters(leaderChannel,
-                1,
+                2,
                 0,
-                1,
+                7,
                 0,
-                3,
+                4,
                 0,
                 1);
 
@@ -876,11 +921,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         });
 
         validateMonitorCounters(leaderChannel,
-                2,
-                0,
                 3,
                 0,
-                3,
+                9,
+                0,
+                4,
                 0,
                 1);
 
@@ -905,11 +950,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         });
 
         validateMonitorCounters(leaderChannel,
-                2,
-                0,
                 3,
                 0,
-                3,
+                9,
+                0,
+                4,
                 1,
                 1);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index ac5d92c8802..03bb53eb9da 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -683,7 +683,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
 
     @Test
     public void testHeartbeatNamespaceMatch() throws Exception {
-        NamespaceName namespaceName = 
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf);
+        NamespaceName namespaceName = 
NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), conf);
         NamespaceBundle namespaceBundle = 
pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
         assertTrue(NamespaceService.isSystemServiceNamespace(
                         
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 04018d5fb9d..2489aa5f026 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1538,8 +1538,10 @@ public class BrokerServiceTest extends BrokerTestBase {
 
         
assertTrue(brokerService.isSystemTopic(TRANSACTION_COORDINATOR_ASSIGN));
         assertTrue(brokerService.isSystemTopic(TRANSACTION_COORDINATOR_LOG));
-        NamespaceName heartbeatNamespaceV1 = 
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), 
pulsar.getConfig());
-        NamespaceName heartbeatNamespaceV2 = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), 
pulsar.getConfig());
+        NamespaceName heartbeatNamespaceV1 = NamespaceService
+                .getHeartbeatNamespace(pulsar.getLookupServiceAddress(), 
pulsar.getConfig());
+        NamespaceName heartbeatNamespaceV2 = NamespaceService
+                .getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), 
pulsar.getConfig());
         assertTrue(brokerService.isSystemTopic("persistent://" + 
heartbeatNamespaceV1.toString() + "/healthcheck"));
         assertTrue(brokerService.isSystemTopic(heartbeatNamespaceV2.toString() 
+ "/healthcheck"));
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index df58188da3e..ddb4e30b9ac 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -597,10 +597,12 @@ public class InactiveTopicDeleteTest extends 
BrokerTestBase {
         conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
         super.baseSetup();
         // init topic
-        NamespaceName heartbeatNamespaceV1 = 
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), 
pulsar.getConfig());
+        NamespaceName heartbeatNamespaceV1 = NamespaceService
+                .getHeartbeatNamespace(pulsar.getLookupServiceAddress(), 
pulsar.getConfig());
         final String healthCheckTopicV1 = "persistent://" + 
heartbeatNamespaceV1 + "/healthcheck";
 
-        NamespaceName heartbeatNamespaceV2 = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), 
pulsar.getConfig());
+        NamespaceName heartbeatNamespaceV2 = NamespaceService
+                .getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), 
pulsar.getConfig());
         final String healthCheckTopicV2 = "persistent://" + 
heartbeatNamespaceV2 + "/healthcheck";
 
         admin.brokers().healthcheck(TopicVersion.V1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 34e7dfe92e5..4af0bd90523 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -164,7 +164,7 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
 
     @Test
     public void testHealthCheckTopicNotOffload() throws Exception {
-        NamespaceName namespaceName = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+        NamespaceName namespaceName = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
                 pulsar.getConfig());
         TopicName topicName = TopicName.get("persistent", namespaceName, 
BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
         PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
@@ -184,7 +184,7 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
     @Test
     public void testSystemNamespaceNotCreateChangeEventsTopic() throws 
Exception {
         admin.brokers().healthcheck(TopicVersion.V2);
-        NamespaceName namespaceName = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+        NamespaceName namespaceName = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
                 pulsar.getConfig());
         TopicName topicName = TopicName.get("persistent", namespaceName, 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
         Optional<Topic> optionalTopic = pulsar.getBrokerService()
@@ -195,7 +195,7 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
     @Test
     public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
         admin.brokers().healthcheck(TopicVersion.V2);
-        NamespaceName namespaceName = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+        NamespaceName namespaceName = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
                 pulsar.getConfig());
         TopicName topicName = TopicName.get("persistent", namespaceName, 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
         for (int partition = 0; partition < PARTITIONS; partition ++) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index c5dbd9c49aa..e603b3ccc4d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1771,9 +1771,9 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     @SneakyThrows
     @Test
     public void testHealthCheckTopicNotCompacted() {
-        NamespaceName heartbeatNamespaceV1 = 
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), 
pulsar.getConfiguration());
+        NamespaceName heartbeatNamespaceV1 = 
NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), 
pulsar.getConfiguration());
         String topicV1 = "persistent://" + heartbeatNamespaceV1.toString() + 
"/healthcheck";
-        NamespaceName heartbeatNamespaceV2 = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), 
pulsar.getConfiguration());
+        NamespaceName heartbeatNamespaceV2 = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), 
pulsar.getConfiguration());
         String topicV2 = heartbeatNamespaceV2.toString() + "/healthcheck";
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicV1).create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicV2).create();

Reply via email to