This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new acb5a54f3f1 [fix][broker] Cleanup correctly heartbeat bundle ownership
when handling broker deletion event (#21083)
acb5a54f3f1 is described below
commit acb5a54f3f194b7a07062100670e94c0330e2a21
Author: Kai Wang <[email protected]>
AuthorDate: Mon Sep 4 17:49:35 2023 +0800
[fix][broker] Cleanup correctly heartbeat bundle ownership when handling
broker deletion event (#21083)
---
.../pulsar/broker/admin/impl/BrokersBase.java | 5 +-
.../channel/ServiceUnitStateChannelImpl.java | 10 +--
.../pulsar/broker/namespace/NamespaceService.java | 38 ++++------
.../apache/pulsar/broker/admin/AdminApiTest.java | 2 +-
.../pulsar/broker/admin/v1/V1_AdminApiTest.java | 2 +-
.../extensions/ExtensibleLoadManagerImplTest.java | 8 +--
.../channel/ServiceUnitStateChannelTest.java | 81 +++++++++++++++++-----
.../broker/namespace/NamespaceServiceTest.java | 2 +-
.../pulsar/broker/service/BrokerServiceTest.java | 6 +-
.../broker/service/InactiveTopicDeleteTest.java | 6 +-
.../systopic/PartitionedSystemTopicTest.java | 6 +-
.../apache/pulsar/compaction/CompactionTest.java | 4 +-
12 files changed, 106 insertions(+), 64 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index a61a2c940b8..d37a9eda1ab 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -397,9 +397,10 @@ public class BrokersBase extends AdminResource {
private CompletableFuture<Void> internalRunHealthCheck(TopicVersion
topicVersion) {
+ String lookupServiceAddress = pulsar().getLookupServiceAddress();
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
- ?
NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(),
pulsar().getConfiguration())
- :
NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(),
pulsar().getConfiguration());
+ ?
NamespaceService.getHeartbeatNamespaceV2(lookupServiceAddress,
pulsar().getConfiguration())
+ : NamespaceService.getHeartbeatNamespace(lookupServiceAddress,
pulsar().getConfiguration());
final String topicName = String.format("persistent://%s/%s",
namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
LOG.info("[{}] Running healthCheck with topic={}", clientAppId(),
topicName);
final String messageStr = UUID.randomUUID().toString();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 4eb25848fda..e9fbf625edd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -41,6 +41,8 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
+import static
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT;
+import static
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static
org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
@@ -92,6 +94,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
@@ -1213,10 +1216,9 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
String heartbeatNamespace =
-
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(),
pulsar.getConfiguration())
- .toString();
- String heartbeatNamespaceV2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
- pulsar.getConfiguration()).toString();
+ NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT,
config.getClusterName(), broker)).toString();
+ String heartbeatNamespaceV2 =
+ NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2,
broker)).toString();
Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new
HashMap<>();
for (var etr : tableview.entrySet()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 141e48515b2..abf96361408 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -132,8 +132,8 @@ public class NamespaceService implements AutoCloseable {
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN =
Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 =
Pattern.compile("pulsar/([^:]+:\\d+)");
public static final Pattern SLA_NAMESPACE_PATTERN =
Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
- public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s";
- public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s:%s";
+ public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s";
+ public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s";
public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY +
"/%s/%s:%s";
private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl>
namespaceClients;
@@ -159,7 +159,7 @@ public class NamespaceService implements AutoCloseable {
*/
public NamespaceService(PulsarService pulsar) {
this.pulsar = pulsar;
- host = pulsar.getAdvertisedAddress();
+ this.host = pulsar.getAdvertisedAddress();
this.config = pulsar.getConfiguration();
this.loadManager = pulsar.getLoadManager();
this.bundleFactory = new NamespaceBundleFactory(pulsar,
Hashing.crc32());
@@ -327,15 +327,17 @@ public class NamespaceService implements AutoCloseable {
* @throws PulsarServerException
*/
public void registerBootstrapNamespaces() throws PulsarServerException {
-
+ String lookupServiceAddress = pulsar.getLookupServiceAddress();
// ensure that we own the heartbeat namespace
- if (registerNamespace(getHeartbeatNamespace(host, config), true)) {
- LOG.info("added heartbeat namespace name in local cache: ns={}",
getHeartbeatNamespace(host, config));
+ if (registerNamespace(getHeartbeatNamespace(lookupServiceAddress,
config), true)) {
+ LOG.info("added heartbeat namespace name in local cache: ns={}",
+ getHeartbeatNamespace(lookupServiceAddress, config));
}
// ensure that we own the heartbeat namespace
- if (registerNamespace(getHeartbeatNamespaceV2(host, config), true)) {
- LOG.info("added heartbeat namespace name in local cache: ns={}",
getHeartbeatNamespaceV2(host, config));
+ if (registerNamespace(getHeartbeatNamespaceV2(lookupServiceAddress,
config), true)) {
+ LOG.info("added heartbeat namespace name in local cache: ns={}",
+ getHeartbeatNamespaceV2(lookupServiceAddress, config));
}
// we may not need strict ownership checking for bootstrap names for
now
@@ -1566,24 +1568,12 @@ public class NamespaceService implements AutoCloseable {
LOG.info("Namespace {} unloaded successfully", namespaceName);
}
- public static NamespaceName getHeartbeatNamespace(String host,
ServiceConfiguration config) {
- Integer port = null;
- if (config.getWebServicePort().isPresent()) {
- port = config.getWebServicePort().get();
- } else if (config.getWebServicePortTls().isPresent()) {
- port = config.getWebServicePortTls().get();
- }
- return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT,
config.getClusterName(), host, port));
+ public static NamespaceName getHeartbeatNamespace(String lookupBroker,
ServiceConfiguration config) {
+ return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT,
config.getClusterName(), lookupBroker));
}
- public static NamespaceName getHeartbeatNamespaceV2(String host,
ServiceConfiguration config) {
- Integer port = null;
- if (config.getWebServicePort().isPresent()) {
- port = config.getWebServicePort().get();
- } else if (config.getWebServicePortTls().isPresent()) {
- port = config.getWebServicePortTls().get();
- }
- return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2,
host, port));
+ public static NamespaceName getHeartbeatNamespaceV2(String lookupBroker,
ServiceConfiguration config) {
+ return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2,
lookupBroker));
}
public static NamespaceName getSLAMonitorNamespace(String host,
ServiceConfiguration config) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index f9ca8cae625..83f27c3e9d8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -536,7 +536,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
for (String ns : nsMap.keySet()) {
NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
if (ns.equals(
-
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(),
pulsar.getConfiguration())
+
NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(),
pulsar.getConfiguration())
+ "/0x00000000_0xffffffff")) {
assertEquals(nsStatus.broker_assignment,
BrokerAssignment.shared);
assertFalse(nsStatus.is_controlled);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index b2052cdcbf0..f8614631a9f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -452,7 +452,7 @@ public class V1_AdminApiTest extends
MockedPulsarServiceBaseTest {
for (String ns : nsMap.keySet()) {
NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
if (ns.equals(
-
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(),
pulsar.getConfiguration())
+
NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(),
pulsar.getConfiguration())
+ "/0x00000000_0xffffffff")) {
assertEquals(nsStatus.broker_assignment,
BrokerAssignment.shared);
assertFalse(nsStatus.is_controlled);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 410c10eeac7..76202b1b0ae 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1027,14 +1027,14 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 30 * 1000)
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws
Exception {
NamespaceName heartbeatNamespacePulsar1V1 =
-
NamespaceService.getHeartbeatNamespace(pulsar1.getAdvertisedAddress(),
pulsar1.getConfiguration());
+
NamespaceService.getHeartbeatNamespace(pulsar1.getLookupServiceAddress(),
pulsar1.getConfiguration());
NamespaceName heartbeatNamespacePulsar1V2 =
-
NamespaceService.getHeartbeatNamespaceV2(pulsar1.getAdvertisedAddress(),
pulsar1.getConfiguration());
+
NamespaceService.getHeartbeatNamespaceV2(pulsar1.getLookupServiceAddress(),
pulsar1.getConfiguration());
NamespaceName heartbeatNamespacePulsar2V1 =
-
NamespaceService.getHeartbeatNamespace(pulsar2.getAdvertisedAddress(),
pulsar2.getConfiguration());
+
NamespaceService.getHeartbeatNamespace(pulsar2.getLookupServiceAddress(),
pulsar2.getConfiguration());
NamespaceName heartbeatNamespacePulsar2V2 =
-
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getAdvertisedAddress(),
pulsar2.getConfiguration());
+
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(),
pulsar2.getConfiguration());
NamespaceBundle bundle1 =
pulsar1.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar1V1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index d8b4f1d75d2..caf7f0d5d5e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -30,6 +30,8 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
+import static
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT;
+import static
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2;
import static
org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
@@ -87,6 +89,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TableViewImpl;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -747,11 +750,41 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
String broker = lookupServiceAddress1;
channel1.publishAssignEventAsync(bundle1, broker);
channel2.publishAssignEventAsync(bundle2, broker);
+
waitUntilNewOwner(channel1, bundle1, broker);
waitUntilNewOwner(channel2, bundle1, broker);
waitUntilNewOwner(channel1, bundle2, broker);
waitUntilNewOwner(channel2, bundle2, broker);
+ // Register the broker-1 heartbeat namespace bundle.
+ String heartbeatNamespaceBroker1V1 = NamespaceName
+ .get(String.format(HEARTBEAT_NAMESPACE_FMT,
conf.getClusterName(), broker)).toString();
+ String heartbeatNamespaceBroker1V2 = NamespaceName
+ .get(String.format(HEARTBEAT_NAMESPACE_FMT_V2,
broker)).toString();
+ String heartbeatNamespaceBroker1V1Bundle = heartbeatNamespaceBroker1V1
+ "/0x00000000_0xfffffff0";
+ String heartbeatNamespaceBroker1V2Bundle = heartbeatNamespaceBroker1V2
+ "/0x00000000_0xfffffff0";
+ channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V1Bundle,
broker);
+ channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V2Bundle,
broker);
+
+ // Register the broker-2 heartbeat namespace bundle.
+ String heartbeatNamespaceBroker2V1 = NamespaceName
+ .get(String.format(HEARTBEAT_NAMESPACE_FMT,
conf.getClusterName(), lookupServiceAddress2)).toString();
+ String heartbeatNamespaceBroker2V2 = NamespaceName
+ .get(String.format(HEARTBEAT_NAMESPACE_FMT_V2,
lookupServiceAddress2)).toString();
+ String heartbeatNamespaceBroker2V1Bundle = heartbeatNamespaceBroker2V1
+ "/0x00000000_0xfffffff0";
+ String heartbeatNamespaceBroker2V2Bundle = heartbeatNamespaceBroker2V2
+ "/0x00000000_0xfffffff0";
+ channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V1Bundle,
lookupServiceAddress2);
+ channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V2Bundle,
lookupServiceAddress2);
+ waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, broker);
+ waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, broker);
+ waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, broker);
+ waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, broker);
+ waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle,
lookupServiceAddress2);
+ waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle,
lookupServiceAddress2);
+ waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle,
lookupServiceAddress2);
+ waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle,
lookupServiceAddress2);
+
+ // Verify to transfer the ownership to the other broker.
channel1.publishUnloadEventAsync(new Unload(broker, bundle1,
Optional.of(lookupServiceAddress2)));
waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
@@ -765,12 +798,24 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS
* 1000 + 1000), true);
leaderChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
followerChannel.handleBrokerRegistrationEvent(broker,
NotificationType.Deleted);
+ leaderChannel.handleBrokerRegistrationEvent(lookupServiceAddress2,
NotificationType.Deleted);
+ followerChannel.handleBrokerRegistrationEvent(lookupServiceAddress2,
NotificationType.Deleted);
waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2);
waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2);
waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2);
+ waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, null);
+ waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, null);
+ waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, null);
+ waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, null);
+
+ waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle, null);
+ waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle, null);
+ waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle, null);
+ waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle, null);
+
verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any());
verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker),
any());
@@ -780,11 +825,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
});
validateMonitorCounters(leaderChannel,
- 1,
+ 2,
0,
- 1,
+ 7,
0,
- 1,
+ 2,
0,
0);
@@ -811,11 +856,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
});
validateMonitorCounters(leaderChannel,
- 1,
+ 2,
0,
- 1,
+ 7,
0,
- 2,
+ 3,
0,
0);
@@ -832,11 +877,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
});
validateMonitorCounters(leaderChannel,
- 1,
+ 2,
0,
- 1,
+ 7,
0,
- 2,
+ 3,
0,
1);
@@ -854,11 +899,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
});
validateMonitorCounters(leaderChannel,
- 1,
+ 2,
0,
- 1,
+ 7,
0,
- 3,
+ 4,
0,
1);
@@ -876,11 +921,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
});
validateMonitorCounters(leaderChannel,
- 2,
- 0,
3,
0,
- 3,
+ 9,
+ 0,
+ 4,
0,
1);
@@ -905,11 +950,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
});
validateMonitorCounters(leaderChannel,
- 2,
- 0,
3,
0,
- 3,
+ 9,
+ 0,
+ 4,
1,
1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index ac5d92c8802..03bb53eb9da 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -683,7 +683,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
@Test
public void testHeartbeatNamespaceMatch() throws Exception {
- NamespaceName namespaceName =
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf);
+ NamespaceName namespaceName =
NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(), conf);
NamespaceBundle namespaceBundle =
pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
assertTrue(NamespaceService.isSystemServiceNamespace(
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 04018d5fb9d..2489aa5f026 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1538,8 +1538,10 @@ public class BrokerServiceTest extends BrokerTestBase {
assertTrue(brokerService.isSystemTopic(TRANSACTION_COORDINATOR_ASSIGN));
assertTrue(brokerService.isSystemTopic(TRANSACTION_COORDINATOR_LOG));
- NamespaceName heartbeatNamespaceV1 =
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(),
pulsar.getConfig());
- NamespaceName heartbeatNamespaceV2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
pulsar.getConfig());
+ NamespaceName heartbeatNamespaceV1 = NamespaceService
+ .getHeartbeatNamespace(pulsar.getLookupServiceAddress(),
pulsar.getConfig());
+ NamespaceName heartbeatNamespaceV2 = NamespaceService
+ .getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
pulsar.getConfig());
assertTrue(brokerService.isSystemTopic("persistent://" +
heartbeatNamespaceV1.toString() + "/healthcheck"));
assertTrue(brokerService.isSystemTopic(heartbeatNamespaceV2.toString()
+ "/healthcheck"));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index df58188da3e..ddb4e30b9ac 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -597,10 +597,12 @@ public class InactiveTopicDeleteTest extends
BrokerTestBase {
conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
super.baseSetup();
// init topic
- NamespaceName heartbeatNamespaceV1 =
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(),
pulsar.getConfig());
+ NamespaceName heartbeatNamespaceV1 = NamespaceService
+ .getHeartbeatNamespace(pulsar.getLookupServiceAddress(),
pulsar.getConfig());
final String healthCheckTopicV1 = "persistent://" +
heartbeatNamespaceV1 + "/healthcheck";
- NamespaceName heartbeatNamespaceV2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
pulsar.getConfig());
+ NamespaceName heartbeatNamespaceV2 = NamespaceService
+ .getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
pulsar.getConfig());
final String healthCheckTopicV2 = "persistent://" +
heartbeatNamespaceV2 + "/healthcheck";
admin.brokers().healthcheck(TopicVersion.V1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 34e7dfe92e5..4af0bd90523 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -164,7 +164,7 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
@Test
public void testHealthCheckTopicNotOffload() throws Exception {
- NamespaceName namespaceName =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+ NamespaceName namespaceName =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
pulsar.getConfig());
TopicName topicName = TopicName.get("persistent", namespaceName,
BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
@@ -184,7 +184,7 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
@Test
public void testSystemNamespaceNotCreateChangeEventsTopic() throws
Exception {
admin.brokers().healthcheck(TopicVersion.V2);
- NamespaceName namespaceName =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+ NamespaceName namespaceName =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
pulsar.getConfig());
TopicName topicName = TopicName.get("persistent", namespaceName,
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
Optional<Topic> optionalTopic = pulsar.getBrokerService()
@@ -195,7 +195,7 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
@Test
public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);
- NamespaceName namespaceName =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+ NamespaceName namespaceName =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
pulsar.getConfig());
TopicName topicName = TopicName.get("persistent", namespaceName,
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
for (int partition = 0; partition < PARTITIONS; partition ++) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index c5dbd9c49aa..e603b3ccc4d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1771,9 +1771,9 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@SneakyThrows
@Test
public void testHealthCheckTopicNotCompacted() {
- NamespaceName heartbeatNamespaceV1 =
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(),
pulsar.getConfiguration());
+ NamespaceName heartbeatNamespaceV1 =
NamespaceService.getHeartbeatNamespace(pulsar.getLookupServiceAddress(),
pulsar.getConfiguration());
String topicV1 = "persistent://" + heartbeatNamespaceV1.toString() +
"/healthcheck";
- NamespaceName heartbeatNamespaceV2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
pulsar.getConfiguration());
+ NamespaceName heartbeatNamespaceV2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
pulsar.getConfiguration());
String topicV2 = heartbeatNamespaceV2.toString() + "/healthcheck";
Producer<byte[]> producer1 =
pulsarClient.newProducer().topic(topicV1).create();
Producer<byte[]> producer2 =
pulsarClient.newProducer().topic(topicV2).create();