This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f85e0dc085d [fix][broker] Fix lookup heartbeat and sla namespace
bundle when using extensible load manager (#21213)
f85e0dc085d is described below
commit f85e0dc085d47587037cbc3f7d5443745384d350
Author: Kai Wang <[email protected]>
AuthorDate: Sun Oct 8 08:52:28 2023 +0800
[fix][broker] Fix lookup heartbeat and sla namespace bundle when using
extensible load manager (#21213)
---
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../apache/pulsar/broker/loadbalance/LoadData.java | 2 +-
.../extensions/ExtensibleLoadManagerImpl.java | 338 ++++++++++++---------
.../channel/ServiceUnitStateChannelImpl.java | 71 ++---
.../loadbalance/extensions/models/TopKBundles.java | 5 +-
.../pulsar/broker/namespace/NamespaceService.java | 38 +--
.../extensions/ExtensibleLoadManagerImplTest.java | 70 ++++-
.../channel/ServiceUnitStateChannelTest.java | 57 +---
8 files changed, 320 insertions(+), 263 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 46f0756a82a..e2950594047 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1176,7 +1176,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
protected void acquireSLANamespace() {
try {
// Namespace not created hence no need to unload it
- NamespaceName nsName =
NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config);
+ NamespaceName nsName =
NamespaceService.getSLAMonitorNamespace(getLookupServiceAddress(), config);
if
(!this.pulsarResources.getNamespaceResources().namespaceExists(nsName)) {
LOG.info("SLA Namespace = {} doesn't exist.", nsName);
return;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
index a632a47f051..c1fe2a4930c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
@@ -64,7 +64,7 @@ public class LoadData {
public Map<String, BundleData> getBundleDataForLoadShedding() {
return bundleData.entrySet().stream()
- .filter(e -> !NamespaceService.filterNamespaceForShedding(
+ .filter(e -> !NamespaceService.isSLAOrHeartbeatNamespace(
NamespaceBundle.getBundleNamespace(e.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index cba499eb8ee..85baf9ec4fb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -44,6 +44,7 @@ import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -86,6 +87,7 @@ import
org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsa
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
@@ -95,7 +97,6 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.slf4j.Logger;
@@ -152,6 +153,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
@Getter
private final List<BrokerFilter> brokerFilterPipeline;
+
/**
* The load data reporter.
*/
@@ -181,10 +183,8 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
// record split metrics
private final AtomicReference<List<Metrics>> splitMetrics = new
AtomicReference<>();
- private final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<BrokerLookupData>>>
- lookupRequests = ConcurrentOpenHashMap.<String,
- CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
- .build();
+ private final ConcurrentHashMap<String,
CompletableFuture<Optional<BrokerLookupData>>>
+ lookupRequests = new ConcurrentHashMap<>();
private final CountDownLatch initWaiter = new CountDownLatch(1);
/**
@@ -197,7 +197,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
}
Set<Map.Entry<String, ServiceUnitStateData>> entrySet =
serviceUnitStateChannel.getOwnershipEntrySet();
String brokerId = brokerRegistry.getBrokerId();
- return entrySet.stream()
+ Set<NamespaceBundle> ownedServiceUnits = entrySet.stream()
.filter(entry -> {
var stateData = entry.getValue();
return stateData.state() == ServiceUnitState.Owned
@@ -207,6 +207,36 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
var bundle = entry.getKey();
return getNamespaceBundle(pulsar, bundle);
}).collect(Collectors.toSet());
+ // Add heartbeat and SLA monitor namespace bundle.
+ NamespaceName heartbeatNamespace =
NamespaceService.getHeartbeatNamespace(brokerId, pulsar.getConfiguration());
+ try {
+ NamespaceBundle fullBundle =
pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundle(heartbeatNamespace);
+ ownedServiceUnits.add(fullBundle);
+ } catch (Exception e) {
+ log.warn("Failed to get heartbeat namespace bundle.", e);
+ }
+ NamespaceName heartbeatNamespaceV2 = NamespaceService
+ .getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration());
+ try {
+ NamespaceBundle fullBundle =
pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundle(heartbeatNamespaceV2);
+ ownedServiceUnits.add(fullBundle);
+ } catch (Exception e) {
+ log.warn("Failed to get heartbeat namespace V2 bundle.", e);
+ }
+
+ NamespaceName slaMonitorNamespace = NamespaceService
+ .getSLAMonitorNamespace(brokerId, pulsar.getConfiguration());
+ try {
+ NamespaceBundle fullBundle =
pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundle(slaMonitorNamespace);
+ ownedServiceUnits.add(fullBundle);
+ } catch (Exception e) {
+ log.warn("Failed to get SLA Monitor namespace bundle.", e);
+ }
+
+ return ownedServiceUnits;
}
public enum Role {
@@ -261,102 +291,108 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
if (this.started) {
return;
}
- this.brokerRegistry = new BrokerRegistryImpl(pulsar);
- this.leaderElectionService = new LeaderElectionService(
- pulsar.getCoordinationService(),
pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
- state -> {
- pulsar.getLoadManagerExecutor().execute(() -> {
- if (state == LeaderElectionState.Leading) {
- playLeader();
- } else {
- playFollower();
- }
+ try {
+ this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+ this.leaderElectionService = new LeaderElectionService(
+ pulsar.getCoordinationService(),
pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
+ state -> {
+ pulsar.getLoadManagerExecutor().execute(() -> {
+ if (state == LeaderElectionState.Leading) {
+ playLeader();
+ } else {
+ playFollower();
+ }
+ });
});
- });
- this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
- this.brokerRegistry.start();
- this.splitManager = new SplitManager(splitCounter);
- this.unloadManager = new UnloadManager(unloadCounter);
- this.serviceUnitStateChannel.listen(unloadManager);
- this.serviceUnitStateChannel.listen(splitManager);
- this.leaderElectionService.start();
- this.serviceUnitStateChannel.start();
- this.antiAffinityGroupPolicyHelper =
- new AntiAffinityGroupPolicyHelper(pulsar,
serviceUnitStateChannel);
- antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
- this.antiAffinityGroupPolicyFilter = new
AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
- this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
- SimpleResourceAllocationPolicies policies = new
SimpleResourceAllocationPolicies(pulsar);
- this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
- this.brokerFilterPipeline.add(new
BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
-
- createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
- createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
+ this.serviceUnitStateChannel = new
ServiceUnitStateChannelImpl(pulsar);
+ this.brokerRegistry.start();
+ this.splitManager = new SplitManager(splitCounter);
+ this.unloadManager = new UnloadManager(unloadCounter);
+ this.serviceUnitStateChannel.listen(unloadManager);
+ this.serviceUnitStateChannel.listen(splitManager);
+ this.leaderElectionService.start();
+ this.serviceUnitStateChannel.start();
+ this.antiAffinityGroupPolicyHelper =
+ new AntiAffinityGroupPolicyHelper(pulsar,
serviceUnitStateChannel);
+ antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
+ this.antiAffinityGroupPolicyFilter = new
AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
+ this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
+ SimpleResourceAllocationPolicies policies = new
SimpleResourceAllocationPolicies(pulsar);
+ this.isolationPoliciesHelper = new
IsolationPoliciesHelper(policies);
+ this.brokerFilterPipeline.add(new
BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
+
+ createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
+ createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
- try {
- this.brokerLoadDataStore = LoadDataStoreFactory
- .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC,
BrokerLoadData.class);
- this.brokerLoadDataStore.startTableView();
- this.topBundlesLoadDataStore = LoadDataStoreFactory
- .create(pulsar.getClient(),
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
- } catch (LoadDataStoreException e) {
- throw new PulsarServerException(e);
- }
+ try {
+ this.brokerLoadDataStore = LoadDataStoreFactory
+ .create(pulsar.getClient(),
BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
+ this.brokerLoadDataStore.startTableView();
+ this.topBundlesLoadDataStore = LoadDataStoreFactory
+ .create(pulsar.getClient(),
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
+ } catch (LoadDataStoreException e) {
+ throw new PulsarServerException(e);
+ }
- this.context = LoadManagerContextImpl.builder()
- .configuration(conf)
- .brokerRegistry(brokerRegistry)
- .brokerLoadDataStore(brokerLoadDataStore)
- .topBundleLoadDataStore(topBundlesLoadDataStore).build();
-
- this.brokerLoadDataReporter =
- new BrokerLoadDataReporter(pulsar,
brokerRegistry.getBrokerId(), brokerLoadDataStore);
-
- this.topBundleLoadDataReporter =
- new TopBundleLoadDataReporter(pulsar,
brokerRegistry.getBrokerId(), topBundlesLoadDataStore);
- this.serviceUnitStateChannel.listen(brokerLoadDataReporter);
- this.serviceUnitStateChannel.listen(topBundleLoadDataReporter);
- var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
- this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
- .scheduleAtFixedRate(() -> {
- try {
- brokerLoadDataReporter.reportAsync(false);
- // TODO: update broker load metrics using
getLocalData
- } catch (Throwable e) {
- log.error("Failed to run the broker load
manager executor job.", e);
- }
- },
- interval,
- interval, TimeUnit.MILLISECONDS);
-
- this.topBundlesLoadDataReportTask =
this.pulsar.getLoadManagerExecutor()
- .scheduleAtFixedRate(() -> {
- try {
- // TODO: consider excluding the bundles that
are in the process of split.
- topBundleLoadDataReporter.reportAsync(false);
- } catch (Throwable e) {
- log.error("Failed to run the top bundles load
manager executor job.", e);
- }
- },
- interval,
- interval, TimeUnit.MILLISECONDS);
-
- this.monitorTask = this.pulsar.getLoadManagerExecutor()
- .scheduleAtFixedRate(() -> {
- monitor();
- },
- MONITOR_INTERVAL_IN_MILLIS,
- MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
-
- this.unloadScheduler = new UnloadScheduler(
- pulsar, pulsar.getLoadManagerExecutor(), unloadManager,
context,
- serviceUnitStateChannel, unloadCounter, unloadMetrics);
- this.unloadScheduler.start();
- this.splitScheduler = new SplitScheduler(
- pulsar, serviceUnitStateChannel, splitManager, splitCounter,
splitMetrics, context);
- this.splitScheduler.start();
- this.initWaiter.countDown();
- this.started = true;
+ this.context = LoadManagerContextImpl.builder()
+ .configuration(conf)
+ .brokerRegistry(brokerRegistry)
+ .brokerLoadDataStore(brokerLoadDataStore)
+ .topBundleLoadDataStore(topBundlesLoadDataStore).build();
+
+ this.brokerLoadDataReporter =
+ new BrokerLoadDataReporter(pulsar,
brokerRegistry.getBrokerId(), brokerLoadDataStore);
+
+ this.topBundleLoadDataReporter =
+ new TopBundleLoadDataReporter(pulsar,
brokerRegistry.getBrokerId(), topBundlesLoadDataStore);
+ this.serviceUnitStateChannel.listen(brokerLoadDataReporter);
+ this.serviceUnitStateChannel.listen(topBundleLoadDataReporter);
+ var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
+ this.brokerLoadDataReportTask =
this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ try {
+ brokerLoadDataReporter.reportAsync(false);
+ // TODO: update broker load metrics using
getLocalData
+ } catch (Throwable e) {
+ log.error("Failed to run the broker load
manager executor job.", e);
+ }
+ },
+ interval,
+ interval, TimeUnit.MILLISECONDS);
+
+ this.topBundlesLoadDataReportTask =
this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ try {
+ // TODO: consider excluding the bundles
that are in the process of split.
+
topBundleLoadDataReporter.reportAsync(false);
+ } catch (Throwable e) {
+ log.error("Failed to run the top bundles
load manager executor job.", e);
+ }
+ },
+ interval,
+ interval, TimeUnit.MILLISECONDS);
+
+ this.monitorTask = this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ monitor();
+ },
+ MONITOR_INTERVAL_IN_MILLIS,
+ MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
+
+ this.unloadScheduler = new UnloadScheduler(
+ pulsar, pulsar.getLoadManagerExecutor(), unloadManager,
context,
+ serviceUnitStateChannel, unloadCounter, unloadMetrics);
+ this.unloadScheduler.start();
+ this.splitScheduler = new SplitScheduler(
+ pulsar, serviceUnitStateChannel, splitManager,
splitCounter, splitMetrics, context);
+ this.splitScheduler.start();
+ this.initWaiter.countDown();
+ this.started = true;
+ } catch (Exception ex) {
+ if (this.brokerRegistry != null) {
+ brokerRegistry.close();
+ }
+ }
}
@Override
@@ -377,25 +413,38 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
owner = serviceUnitStateChannel.getChannelOwnerAsync();
} else {
- owner = getOwnerAsync(serviceUnit, bundle,
false).thenApply(Optional::ofNullable);
+ String candidateBrokerId =
getHeartbeatOrSLAMonitorBrokerId(serviceUnit);
+ if (candidateBrokerId != null) {
+ owner =
CompletableFuture.completedFuture(Optional.of(candidateBrokerId));
+ } else {
+ owner = getOrSelectOwnerAsync(serviceUnit,
bundle).thenApply(Optional::ofNullable);
+ }
}
return getBrokerLookupData(owner, bundle);
});
}
- private CompletableFuture<String> getOwnerAsync(
- ServiceUnitId serviceUnit, String bundle, boolean
ownByLocalBrokerIfAbsent) {
+ private String getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnit)
{
+ // Check if this is Heartbeat or SLAMonitor namespace
+ String candidateBroker =
NamespaceService.checkHeartbeatNamespace(serviceUnit);
+ if (candidateBroker == null) {
+ candidateBroker =
NamespaceService.checkHeartbeatNamespaceV2(serviceUnit);
+ }
+ if (candidateBroker == null) {
+ candidateBroker =
NamespaceService.getSLAMonitorBrokerName(serviceUnit);
+ }
+ if (candidateBroker != null) {
+ return candidateBroker.substring(candidateBroker.lastIndexOf('/')
+ 1);
+ }
+ return candidateBroker;
+ }
+
+ private CompletableFuture<String> getOrSelectOwnerAsync(ServiceUnitId
serviceUnit,
+ String bundle) {
return
serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
// If the bundle not assign yet, select and publish assign event
to channel.
if (broker.isEmpty()) {
- CompletableFuture<Optional<String>> selectedBroker;
- if (ownByLocalBrokerIfAbsent) {
- String brokerId = this.brokerRegistry.getBrokerId();
- selectedBroker =
CompletableFuture.completedFuture(Optional.of(brokerId));
- } else {
- selectedBroker = this.selectAsync(serviceUnit);
- }
- return selectedBroker.thenCompose(brokerOpt -> {
+ return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
if (brokerOpt.isPresent()) {
assignCounter.incrementSuccess();
log.info("Selected new owner broker: {} for bundle:
{}.", brokerOpt.get(), bundle);
@@ -425,7 +474,8 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
}).thenCompose(broker ->
this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
- "Failed to look up a broker registry:%s for
bundle:%s", broker, bundle);
+ "Failed to lookup broker:%s for bundle:%s, the broker
has not been registered.",
+ broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
@@ -443,30 +493,37 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
public CompletableFuture<NamespaceEphemeralData>
tryAcquiringOwnership(NamespaceBundle namespaceBundle) {
log.info("Try acquiring ownership for bundle: {} - {}.",
namespaceBundle, brokerRegistry.getBrokerId());
final String bundle = namespaceBundle.toString();
- return dedupeLookupRequest(bundle, k -> {
- final CompletableFuture<String> owner =
- this.getOwnerAsync(namespaceBundle, bundle, true);
- return getBrokerLookupData(owner.thenApply(Optional::ofNullable),
bundle);
- }).thenApply(brokerLookupData -> {
- if (brokerLookupData.isEmpty()) {
- throw new IllegalStateException(
- "Failed to get the broker lookup data for bundle: " +
bundle);
- }
- return brokerLookupData.get().toNamespaceEphemeralData();
- });
+ return assign(Optional.empty(), namespaceBundle)
+ .thenApply(brokerLookupData -> {
+ if (brokerLookupData.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to get the broker lookup data for
bundle:%s", bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return brokerLookupData.get().toNamespaceEphemeralData();
+ });
}
private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
String key, Function<String,
CompletableFuture<Optional<BrokerLookupData>>> provider) {
- CompletableFuture<Optional<BrokerLookupData>> future =
lookupRequests.computeIfAbsent(key, provider);
- future.whenComplete((r, t) -> {
- if (t != null) {
+ final MutableObject<CompletableFuture<Optional<BrokerLookupData>>>
newFutureCreated = new MutableObject<>();
+ try {
+ return lookupRequests.computeIfAbsent(key, k -> {
+ CompletableFuture<Optional<BrokerLookupData>> future =
provider.apply(k);
+ newFutureCreated.setValue(future);
+ return future;
+ });
+ } finally {
+ if (newFutureCreated.getValue() != null) {
+ newFutureCreated.getValue().whenComplete((v, ex) -> {
+ if (ex != null) {
assignCounter.incrementFailure();
}
- lookupRequests.remove(key);
- }
- );
- return future;
+ lookupRequests.remove(key, newFutureCreated.getValue());
+ });
+ }
+ }
}
public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId
bundle) {
@@ -521,15 +578,16 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
}
public CompletableFuture<Optional<String>>
getOwnershipAsync(Optional<ServiceUnitId> topic,
- ServiceUnitId
bundleUnit) {
- final String bundle = bundleUnit.toString();
- CompletableFuture<Optional<String>> owner;
+ ServiceUnitId
serviceUnit) {
+ final String bundle = serviceUnit.toString();
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
- owner = serviceUnitStateChannel.getChannelOwnerAsync();
- } else {
- owner = serviceUnitStateChannel.getOwnerAsync(bundle);
+ return serviceUnitStateChannel.getChannelOwnerAsync();
}
- return owner;
+ String candidateBroker = getHeartbeatOrSLAMonitorBrokerId(serviceUnit);
+ if (candidateBroker != null) {
+ return
CompletableFuture.completedFuture(Optional.of(candidateBroker));
+ }
+ return serviceUnitStateChannel.getOwnerAsync(bundle);
}
public CompletableFuture<Optional<BrokerLookupData>>
getOwnershipWithLookupDataAsync(ServiceUnitId bundleUnit) {
@@ -543,6 +601,10 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId
bundle,
Optional<String>
destinationBroker) {
+ if
(NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString()))
{
+ log.info("Skip unloading namespace bundle: {}.", bundle);
+ return CompletableFuture.completedFuture(null);
+ }
return getOwnershipAsync(Optional.empty(), bundle)
.thenCompose(brokerOpt -> {
if (brokerOpt.isEmpty()) {
@@ -577,6 +639,10 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
public CompletableFuture<Void> splitNamespaceBundleAsync(ServiceUnitId
bundle,
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long>
boundaries) {
+ if
(NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString()))
{
+ log.info("Skip split namespace bundle: {}.", bundle);
+ return CompletableFuture.completedFuture(null);
+ }
final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle.toString());
final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle.toString());
NamespaceBundle namespaceBundle =
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 98aa02d4e72..d71513652e9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -41,8 +41,6 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
-import static
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT;
-import static
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static
org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
@@ -94,7 +92,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
-import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
@@ -1216,48 +1213,19 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
log.info("Started ownership cleanup for the inactive broker:{}",
broker);
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
- String heartbeatNamespace =
- NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT,
config.getClusterName(), broker)).toString();
- String heartbeatNamespaceV2 =
- NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2,
broker)).toString();
-
Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new
HashMap<>();
for (var etr : tableview.entrySet()) {
var stateData = etr.getValue();
var serviceUnit = etr.getKey();
var state = state(stateData);
- if (StringUtils.equals(broker, stateData.dstBroker())) {
- if (isActiveState(state)) {
- if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
- orphanSystemServiceUnits.put(serviceUnit, stateData);
- } else if (serviceUnit.startsWith(heartbeatNamespace)
- || serviceUnit.startsWith(heartbeatNamespaceV2)) {
- // Skip the heartbeat namespace
- log.info("Skip override heartbeat namespace bundle"
- + " serviceUnit:{}, stateData:{}",
serviceUnit, stateData);
- tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
- if (e != null) {
- log.error("Failed cleaning the heartbeat
namespace ownership serviceUnit:{}, "
- + "stateData:{},
cleanupErrorCnt:{}.",
- serviceUnit, stateData,
- totalCleanupErrorCnt.incrementAndGet()
- totalCleanupErrorCntStart, e);
- }
- });
- } else {
- overrideOwnership(serviceUnit, stateData, broker);
- }
- orphanServiceUnitCleanupCnt++;
- }
-
- } else if (StringUtils.equals(broker, stateData.sourceBroker())) {
- if (isInFlightState(state)) {
- if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
- orphanSystemServiceUnits.put(serviceUnit, stateData);
- } else {
- overrideOwnership(serviceUnit, stateData, broker);
- }
- orphanServiceUnitCleanupCnt++;
+ if (StringUtils.equals(broker, stateData.dstBroker()) &&
isActiveState(state)
+ || StringUtils.equals(broker, stateData.sourceBroker()) &&
isInFlightState(state)) {
+ if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
+ orphanSystemServiceUnits.put(serviceUnit, stateData);
+ } else {
+ overrideOwnership(serviceUnit, stateData, broker);
}
+ orphanServiceUnitCleanupCnt++;
}
}
@@ -1401,16 +1369,21 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
String srcBroker = stateData.sourceBroker();
var state = stateData.state();
- if (isActiveState(state)) {
- if (StringUtils.isNotBlank(srcBroker) &&
!activeBrokers.contains(srcBroker)) {
- inactiveBrokers.add(srcBroker);
- } else if (StringUtils.isNotBlank(dstBroker) &&
!activeBrokers.contains(dstBroker)) {
- inactiveBrokers.add(dstBroker);
- } else if (isInFlightState(state)
- && now - stateData.timestamp() >
inFlightStateWaitingTimeInMillis) {
- orphanServiceUnits.put(serviceUnit, stateData);
- }
- } else if (now - stateData.timestamp() >
semiTerminalStateWaitingTimeInMillis) {
+ if (isActiveState(state) && StringUtils.isNotBlank(srcBroker) &&
!activeBrokers.contains(srcBroker)) {
+ inactiveBrokers.add(srcBroker);
+ continue;
+ }
+ if (isActiveState(state) && StringUtils.isNotBlank(dstBroker) &&
!activeBrokers.contains(dstBroker)) {
+ inactiveBrokers.add(dstBroker);
+ continue;
+ }
+ if (isActiveState(state) && isInFlightState(state)
+ && now - stateData.timestamp() >
inFlightStateWaitingTimeInMillis) {
+ orphanServiceUnits.put(serviceUnit, stateData);
+ continue;
+ }
+
+ if (now - stateData.timestamp() >
semiTerminalStateWaitingTimeInMillis) {
log.info("Found semi-terminal states to tombstone"
+ " serviceUnit:{}, stateData:{}", serviceUnit,
stateData);
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
index 2f5c32197c1..624546fdff8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
@@ -30,6 +30,8 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
@@ -70,7 +72,8 @@ public class TopKBundles {
pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
for (var etr : bundleStats.entrySet()) {
String bundle = etr.getKey();
- if
(bundle.startsWith(NamespaceName.SYSTEM_NAMESPACE.toString())) {
+ // TODO: do not filter system topic while shedding
+ if
(NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle)))
{
continue;
}
if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled &&
hasPolicies(bundle)) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 58bef284133..0adc2b59470 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -135,7 +135,7 @@ public class NamespaceService implements AutoCloseable {
public static final Pattern SLA_NAMESPACE_PATTERN =
Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s";
public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s";
- public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY +
"/%s/%s:%s";
+ public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY +
"/%s/%s";
private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl>
namespaceClients;
@@ -189,7 +189,7 @@ public class NamespaceService implements AutoCloseable {
CompletableFuture<Optional<LookupResult>> future =
getBundleAsync(topic)
.thenCompose(bundle -> {
// Do redirection if the cluster is in rollback or
deploying.
- return
redirectManager.findRedirectLookupResultAsync().thenCompose(optResult -> {
+ return
findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
if (optResult.isPresent()) {
LOG.info("[{}] Redirect lookup request to {} for
topic {}",
pulsar.getSafeWebServiceAddress(),
optResult.get(), topic);
@@ -221,6 +221,13 @@ public class NamespaceService implements AutoCloseable {
return future;
}
+ private CompletableFuture<Optional<LookupResult>>
findRedirectLookupResultAsync(ServiceUnitId bundle) {
+ if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString()))
{
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+ return redirectManager.findRedirectLookupResultAsync();
+ }
+
public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
return bundleFactory.getBundlesAsync(topic.getNamespaceObject())
.thenApply(bundles -> bundles.findBundle(topic));
@@ -288,8 +295,7 @@ public class NamespaceService implements AutoCloseable {
private CompletableFuture<Optional<URL>>
internalGetWebServiceUrl(@Nullable ServiceUnitId topic,
NamespaceBundle bundle,
LookupOptions options) {
-
- return
redirectManager.findRedirectLookupResultAsync().thenCompose(optResult -> {
+ return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
if (optResult.isPresent()) {
LOG.info("[{}] Redirect lookup request to {} for topic {}",
pulsar.getSafeWebServiceAddress(), optResult.get(),
topic);
@@ -695,7 +701,7 @@ public class NamespaceService implements AutoCloseable {
return lookupFuture;
}
- private boolean isBrokerActive(String candidateBroker) {
+ public boolean isBrokerActive(String candidateBroker) {
String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
Set<String> availableBrokers = getAvailableBrokers();
if (availableBrokers.contains(candidateBrokerHostAndPort)) {
@@ -1564,7 +1570,7 @@ public class NamespaceService implements AutoCloseable {
}
public void unloadSLANamespace() throws Exception {
- NamespaceName namespaceName = getSLAMonitorNamespace(host, config);
+ NamespaceName namespaceName =
getSLAMonitorNamespace(pulsar.getLookupServiceAddress(), config);
LOG.info("Checking owner for SLA namespace {}", namespaceName);
@@ -1589,14 +1595,8 @@ public class NamespaceService implements AutoCloseable {
return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2,
lookupBroker));
}
- public static NamespaceName getSLAMonitorNamespace(String host,
ServiceConfiguration config) {
- Integer port = null;
- if (config.getWebServicePortTls().isPresent()) {
- port = config.getWebServicePortTls().get();
- } else if (config.getWebServicePort().isPresent()) {
- port = config.getWebServicePort().get();
- }
- return NamespaceName.get(String.format(SLA_NAMESPACE_FMT,
config.getClusterName(), host, port));
+ public static NamespaceName getSLAMonitorNamespace(String lookupBroker,
ServiceConfiguration config) {
+ return NamespaceName.get(String.format(SLA_NAMESPACE_FMT,
config.getClusterName(), lookupBroker));
}
public static String checkHeartbeatNamespace(ServiceUnitId ns) {
@@ -1640,7 +1640,7 @@ public class NamespaceService implements AutoCloseable {
* @param namespace the namespace name
* @return True if namespace is HEARTBEAT_NAMESPACE or SLA_NAMESPACE
*/
- public static boolean filterNamespaceForShedding(String namespace) {
+ public static boolean isSLAOrHeartbeatNamespace(String namespace) {
return SLA_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
@@ -1653,14 +1653,16 @@ public class NamespaceService implements AutoCloseable {
}
public boolean registerSLANamespace() throws PulsarServerException {
- boolean isNameSpaceRegistered =
registerNamespace(getSLAMonitorNamespace(host, config), false);
+ String lookupServiceAddress = pulsar.getLookupServiceAddress();
+ boolean isNameSpaceRegistered =
registerNamespace(getSLAMonitorNamespace(lookupServiceAddress, config), false);
if (isNameSpaceRegistered) {
if (LOG.isDebugEnabled()) {
LOG.debug("Added SLA Monitoring namespace name in local cache:
ns={}",
- getSLAMonitorNamespace(host, config));
+ getSLAMonitorNamespace(lookupServiceAddress, config));
}
} else if (LOG.isDebugEnabled()) {
- LOG.debug("SLA Monitoring not owned by the broker: ns={}",
getSLAMonitorNamespace(host, config));
+ LOG.debug("SLA Monitoring not owned by the broker: ns={}",
+ getSLAMonitorNamespace(lookupServiceAddress, config));
}
return isNameSpaceRegistered;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 9ce57a88540..011e7174cbe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -35,6 +35,9 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
+import static
org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespace;
+import static
org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespaceV2;
+import static
org.apache.pulsar.broker.namespace.NamespaceService.getSLAMonitorNamespace;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -594,6 +597,18 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
assertTrue(webServiceUrl3.isPresent());
assertEquals(webServiceUrl3.get().toString(),
webServiceUrl1.get().toString());
+ List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2,
pulsar3);
+ for (PulsarService pulsarService : pulsarServices) {
+ // Test lookup heartbeat namespace's topic
+ for (PulsarService pulsar : pulsarServices) {
+ assertLookupHeartbeatOwner(pulsarService,
pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ }
+ // Test lookup SLA namespace's topic
+ for (PulsarService pulsar : pulsarServices) {
+ assertLookupSLANamespaceOwner(pulsarService,
pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ }
+ }
+
// Test deploy new broker with new load manager
ServiceConfiguration conf = getDefaultConf();
conf.setAllowAutoTopicCreation(true);
@@ -642,10 +657,48 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
assertTrue(webServiceUrl4.isPresent());
assertEquals(webServiceUrl4.get().toString(),
webServiceUrl1.get().toString());
+ pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4);
+ for (PulsarService pulsarService : pulsarServices) {
+ // Test lookup heartbeat namespace's topic
+ for (PulsarService pulsar : pulsarServices) {
+ assertLookupHeartbeatOwner(pulsarService,
pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ }
+ // Test lookup SLA namespace's topic
+ for (PulsarService pulsar : pulsarServices) {
+ assertLookupSLANamespaceOwner(pulsarService,
pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ }
+ }
}
}
}
+ private void assertLookupHeartbeatOwner(PulsarService pulsar,
+ String lookupServiceAddress,
+ String expectedBrokerServiceUrl)
throws Exception {
+ NamespaceName heartbeatNamespaceV1 =
+ getHeartbeatNamespace(lookupServiceAddress,
pulsar.getConfiguration());
+
+ String heartbeatV1Topic =
heartbeatNamespaceV1.getPersistentTopicName("test");
+
assertEquals(pulsar.getAdminClient().lookups().lookupTopic(heartbeatV1Topic),
expectedBrokerServiceUrl);
+
+ NamespaceName heartbeatNamespaceV2 =
+ getHeartbeatNamespaceV2(lookupServiceAddress,
pulsar.getConfiguration());
+
+ String heartbeatV2Topic =
heartbeatNamespaceV2.getPersistentTopicName("test");
+
assertEquals(pulsar.getAdminClient().lookups().lookupTopic(heartbeatV2Topic),
expectedBrokerServiceUrl);
+ }
+
+ private void assertLookupSLANamespaceOwner(PulsarService pulsar,
+ String lookupServiceAddress,
+ String
expectedBrokerServiceUrl) throws Exception {
+ NamespaceName slaMonitorNamespace =
getSLAMonitorNamespace(lookupServiceAddress, pulsar.getConfiguration());
+ String slaMonitorTopic =
slaMonitorNamespace.getPersistentTopicName("test");
+ String result =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+ log.info("Topic {} Lookup result: {}", slaMonitorTopic, result);
+ assertNotNull(result);
+ assertEquals(result, expectedBrokerServiceUrl);
+ }
+
@Test
public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws
Exception {
var topBundlesLoadDataStorePrimary =
@@ -1043,15 +1096,15 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
admin.namespaces().deleteNamespace(namespace, true);
}
- @Test(timeOut = 30 * 1000)
+ @Test(timeOut = 30 * 1000, priority = -1)
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws
Exception {
NamespaceName heartbeatNamespacePulsar1V1 =
-
NamespaceService.getHeartbeatNamespace(pulsar1.getLookupServiceAddress(),
pulsar1.getConfiguration());
+ getHeartbeatNamespace(pulsar1.getLookupServiceAddress(),
pulsar1.getConfiguration());
NamespaceName heartbeatNamespacePulsar1V2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar1.getLookupServiceAddress(),
pulsar1.getConfiguration());
NamespaceName heartbeatNamespacePulsar2V1 =
-
NamespaceService.getHeartbeatNamespace(pulsar2.getLookupServiceAddress(),
pulsar2.getConfiguration());
+ getHeartbeatNamespace(pulsar2.getLookupServiceAddress(),
pulsar2.getConfiguration());
NamespaceName heartbeatNamespacePulsar2V2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(),
pulsar2.getConfiguration());
@@ -1068,22 +1121,22 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
Set<NamespaceBundle> ownedServiceUnitsByPulsar1 =
primaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
// heartbeat namespace bundle will own by pulsar1
- assertEquals(ownedServiceUnitsByPulsar1.size(), 2);
+ assertEquals(ownedServiceUnitsByPulsar1.size(), 3);
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1));
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2));
Set<NamespaceBundle> ownedServiceUnitsByPulsar2 =
secondaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
- assertEquals(ownedServiceUnitsByPulsar2.size(), 2);
+ assertEquals(ownedServiceUnitsByPulsar2.size(), 3);
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3));
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(),
pulsar1.getLookupServiceAddress());
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(),
pulsar2.getLookupServiceAddress());
- assertEquals(ownedNamespacesByPulsar1.size(), 2);
+ assertEquals(ownedNamespacesByPulsar1.size(), 3);
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString()));
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString()));
- assertEquals(ownedNamespacesByPulsar2.size(), 2);
+ assertEquals(ownedNamespacesByPulsar2.size(), 3);
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle3.toString()));
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle4.toString()));
@@ -1134,7 +1187,8 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
String topic = "persistent://" + namespace + "/test";
NamespaceBundle bundle = getBundleAsync(pulsar1,
TopicName.get(topic)).get();
NamespaceEphemeralData namespaceEphemeralData =
primaryLoadManager.tryAcquiringOwnership(bundle).get();
- assertEquals(namespaceEphemeralData.getNativeUrl(),
pulsar1.getBrokerServiceUrl());
+ assertTrue(Set.of(pulsar1.getBrokerServiceUrl(),
pulsar2.getBrokerServiceUrl())
+ .contains(namespaceEphemeralData.getNativeUrl()));
admin.namespaces().deleteNamespace(namespace, true);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index de21842f458..a226df53e12 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -30,8 +30,6 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
-import static
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT;
-import static
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_FMT_V2;
import static
org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
@@ -89,7 +87,6 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TableViewImpl;
-import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -639,7 +636,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var leader = channel1.isChannelOwnerAsync().get() ? channel1 :
channel2;
validateMonitorCounters(leader,
0,
- 1,
+ 3,
0,
0,
0,
@@ -756,34 +753,6 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
waitUntilNewOwner(channel1, bundle2, broker);
waitUntilNewOwner(channel2, bundle2, broker);
- // Register the broker-1 heartbeat namespace bundle.
- String heartbeatNamespaceBroker1V1 = NamespaceName
- .get(String.format(HEARTBEAT_NAMESPACE_FMT,
conf.getClusterName(), broker)).toString();
- String heartbeatNamespaceBroker1V2 = NamespaceName
- .get(String.format(HEARTBEAT_NAMESPACE_FMT_V2,
broker)).toString();
- String heartbeatNamespaceBroker1V1Bundle = heartbeatNamespaceBroker1V1
+ "/0x00000000_0xfffffff0";
- String heartbeatNamespaceBroker1V2Bundle = heartbeatNamespaceBroker1V2
+ "/0x00000000_0xfffffff0";
- channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V1Bundle,
broker);
- channel1.publishAssignEventAsync(heartbeatNamespaceBroker1V2Bundle,
broker);
-
- // Register the broker-2 heartbeat namespace bundle.
- String heartbeatNamespaceBroker2V1 = NamespaceName
- .get(String.format(HEARTBEAT_NAMESPACE_FMT,
conf.getClusterName(), lookupServiceAddress2)).toString();
- String heartbeatNamespaceBroker2V2 = NamespaceName
- .get(String.format(HEARTBEAT_NAMESPACE_FMT_V2,
lookupServiceAddress2)).toString();
- String heartbeatNamespaceBroker2V1Bundle = heartbeatNamespaceBroker2V1
+ "/0x00000000_0xfffffff0";
- String heartbeatNamespaceBroker2V2Bundle = heartbeatNamespaceBroker2V2
+ "/0x00000000_0xfffffff0";
- channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V1Bundle,
lookupServiceAddress2);
- channel1.publishAssignEventAsync(heartbeatNamespaceBroker2V2Bundle,
lookupServiceAddress2);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, broker);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, broker);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, broker);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, broker);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle,
lookupServiceAddress2);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle,
lookupServiceAddress2);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle,
lookupServiceAddress2);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle,
lookupServiceAddress2);
-
// Verify to transfer the ownership to the other broker.
channel1.publishUnloadEventAsync(new Unload(broker, bundle1,
Optional.of(lookupServiceAddress2)));
waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2);
@@ -806,16 +775,6 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2);
waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V1Bundle, null);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker1V2Bundle, null);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V1Bundle, null);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker1V2Bundle, null);
-
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V1Bundle, null);
- waitUntilNewOwner(channel1, heartbeatNamespaceBroker2V2Bundle, null);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V1Bundle, null);
- waitUntilNewOwner(channel2, heartbeatNamespaceBroker2V2Bundle, null);
-
verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any());
verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker),
any());
@@ -827,7 +786,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
validateMonitorCounters(leaderChannel,
2,
0,
- 7,
+ 3,
0,
2,
0,
@@ -858,7 +817,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
validateMonitorCounters(leaderChannel,
2,
0,
- 7,
+ 3,
0,
3,
0,
@@ -879,7 +838,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
validateMonitorCounters(leaderChannel,
2,
0,
- 7,
+ 3,
0,
3,
0,
@@ -901,7 +860,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
validateMonitorCounters(leaderChannel,
2,
0,
- 7,
+ 3,
0,
4,
0,
@@ -923,7 +882,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
validateMonitorCounters(leaderChannel,
3,
0,
- 9,
+ 5,
0,
4,
0,
@@ -952,7 +911,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
validateMonitorCounters(leaderChannel,
3,
0,
- 9,
+ 5,
0,
4,
1,
@@ -1447,7 +1406,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
validateMonitorCounters(leader,
0,
- 1,
+ 3,
1,
0,
0,