This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 32e677d5264 [improve][broker] PIP-192 updated metrics and cleanup
broker selector (#19945)
32e677d5264 is described below
commit 32e677d5264d9145a5765735ae1e6ba0e88a5ef9
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Apr 5 05:00:32 2023 -0700
[improve][broker] PIP-192 updated metrics and cleanup broker selector
(#19945)
---
.../apache/pulsar/PulsarClusterMetadataSetup.java | 4 +-
.../extensions/ExtensibleLoadManagerImpl.java | 27 ++-
.../channel/ServiceUnitStateChannelImpl.java | 197 +++++++++++++++------
.../extensions/models/AssignCounter.java | 10 +-
.../DefaultNamespaceBundleSplitStrategyImpl.java | 3 +-
.../extensions/ExtensibleLoadManagerImplTest.java | 93 ++++------
.../channel/ServiceUnitStateChannelTest.java | 35 ++--
.../DefaultNamespaceBundleSplitStrategyTest.java | 2 +-
8 files changed, 212 insertions(+), 159 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 1a2ca1ec4fb..0badbda1afd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -392,8 +392,8 @@ public class PulsarClusterMetadataSetup {
}
}
- static void createNamespaceIfAbsent(PulsarResources resources,
NamespaceName namespaceName,
- String cluster) throws IOException {
+ public static void createNamespaceIfAbsent(PulsarResources resources,
NamespaceName namespaceName,
+ String cluster) throws
IOException {
createNamespaceIfAbsent(resources, namespaceName, cluster,
DEFAULT_BUNDLE_NUMBER);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 7e84fa5969a..c1234b6dab2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -164,8 +164,6 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
private final UnloadCounter unloadCounter = new UnloadCounter();
private final SplitCounter splitCounter = new SplitCounter();
- // record load metrics
- private final AtomicReference<List<Metrics>> brokerLoadMetrics = new
AtomicReference<>();
// record unload metrics
private final AtomicReference<List<Metrics>> unloadMetrics = new
AtomicReference();
// record split metrics
@@ -338,7 +336,6 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
return
serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
.thenApply(Optional::of);
} else {
- assignCounter.incrementEmpty();
throw new IllegalStateException(
"Failed to select the new owner broker
for bundle: " + bundle);
}
@@ -368,11 +365,17 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
return CompletableFuture.completedFuture(brokerLookupData);
}));
});
- future.whenComplete((r, t) -> lookupRequests.remove(bundle));
+ future.whenComplete((r, t) -> {
+ if (t != null) {
+ assignCounter.incrementFailure();
+ }
+ lookupRequests.remove(bundle);
+ }
+ );
return future;
}
- private CompletableFuture<Optional<String>> selectAsync(ServiceUnitId
bundle) {
+ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId
bundle) {
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
.thenCompose(availableBrokers -> {
@@ -633,20 +636,12 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
}
}
- void updateBrokerLoadMetrics(BrokerLoadData loadData) {
-
this.brokerLoadMetrics.set(loadData.toMetrics(pulsar.getAdvertisedAddress()));
- }
-
- private void updateUnloadMetrics(UnloadDecision decision) {
- unloadCounter.update(decision);
-
this.unloadMetrics.set(unloadCounter.toMetrics(pulsar.getAdvertisedAddress()));
- }
-
public List<Metrics> getMetrics() {
List<Metrics> metricsCollection = new ArrayList<>();
- if (this.brokerLoadMetrics.get() != null) {
- metricsCollection.addAll(this.brokerLoadMetrics.get());
+ if (this.brokerLoadDataReporter != null) {
+ metricsCollection.addAll(brokerLoadDataReporter.generateLoadData()
+ .toMetrics(pulsar.getAdvertisedAddress()));
}
if (this.unloadMetrics.get() != null) {
metricsCollection.addAll(this.unloadMetrics.get());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index e7fb59450b2..ec79698db1f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -41,6 +41,7 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
import static
org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
import com.google.common.annotations.VisibleForTesting;
@@ -65,6 +66,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -76,8 +78,6 @@ import
org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import
org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
-import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
-import
org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -104,7 +104,7 @@ import org.apache.pulsar.metadata.api.extended.SessionEvent;
public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
public static final String TOPIC = TopicName.get(
TopicDomain.persistent.value(),
- NamespaceName.SYSTEM_NAMESPACE,
+ SYSTEM_NAMESPACE,
"loadbalancer-service-unit-state").toString();
private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30
* 1000; // 30sec
public static final long VERSION_ID_INIT = 1; // initial versionId
@@ -113,6 +113,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs
to clean immediately
private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS
= 10;
private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500;
+ private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10
* 60 * 1000;
private final PulsarService pulsar;
private final ServiceConfiguration config;
private final Schema<ServiceUnitStateData> schema;
@@ -120,7 +121,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private final String lookupServiceAddress;
private final ConcurrentOpenHashMap<String, CompletableFuture<Void>>
cleanupJobs;
private final StateChangeListeners stateChangeListeners;
- private BrokerSelectionStrategy brokerSelector;
+ private ExtensibleLoadManagerImpl loadManager;
private BrokerRegistry brokerRegistry;
private LeaderElectionService leaderElectionService;
private TableView<ServiceUnitStateData> tableview;
@@ -144,11 +145,15 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private long totalInactiveBrokerCleanupIgnoredCnt = 0;
private long totalInactiveBrokerCleanupCancelledCnt = 0;
private volatile ChannelState channelState;
+ private volatile long lastOwnEventHandledAt = 0;
+ private long lastOwnedServiceUnitCountAt = 0;
+ private int totalOwnedServiceUnitCnt = 0;
public enum EventType {
Assign,
Split,
- Unload
+ Unload,
+ Override
}
@@ -164,7 +169,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
// operation metrics
- final Map<ServiceUnitState, AtomicLong> ownerLookUpCounters;
+ final Map<ServiceUnitState, Counters> ownerLookUpCounters;
final Map<EventType, Counters> eventCounters;
final Map<ServiceUnitState, Counters> handlerCounters;
@@ -207,11 +212,11 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
this.maxCleanupDelayTimeInSecs = MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
this.minCleanupDelayTimeInSecs = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
- Map<ServiceUnitState, AtomicLong> tmpOwnerLookUpCounters = new
HashMap<>();
+ Map<ServiceUnitState, Counters> tmpOwnerLookUpCounters = new
HashMap<>();
Map<ServiceUnitState, Counters> tmpHandlerCounters = new HashMap<>();
Map<EventType, Counters> tmpEventCounters = new HashMap<>();
for (var state : ServiceUnitState.values()) {
- tmpOwnerLookUpCounters.put(state, new AtomicLong());
+ tmpOwnerLookUpCounters.put(state, new Counters());
tmpHandlerCounters.put(state, new Counters());
}
for (var event : EventType.values()) {
@@ -267,7 +272,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
log.warn("Failed to find the channel leader.");
}
this.channelState = LeaderElectionServiceStarted;
- brokerSelector = getBrokerSelector();
+ loadManager = getLoadManager();
if (producer != null) {
producer.close();
@@ -275,6 +280,9 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
log.info("Closed the channel producer.");
}
}
+ PulsarClusterMetadataSetup.createNamespaceIfAbsent
+ (pulsar.getPulsarResources(),
NamespaceName.SYSTEM_NAMESPACE, config.getClusterName());
+
producer = pulsar.getClient().newProducer(schema)
.enableBatching(true)
.maxPendingMessages(MAX_OUTSTANDING_PUB_MESSAGES)
@@ -329,9 +337,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
@VisibleForTesting
- protected BrokerSelectionStrategy getBrokerSelector() {
- // TODO: make this selector configurable.
- return new LeastResourceUsageWithWeight();
+ protected ExtensibleLoadManagerImpl getLoadManager() {
+ return ExtensibleLoadManagerImpl.get(pulsar.getLoadManager().get());
}
@VisibleForTesting
@@ -465,7 +472,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
ServiceUnitStateData data = tableview.get(serviceUnit);
ServiceUnitState state = state(data);
- ownerLookUpCounters.get(state).incrementAndGet();
+ ownerLookUpCounters.get(state).getTotal().incrementAndGet();
switch (state) {
case Owned -> {
return
CompletableFuture.completedFuture(Optional.of(data.dstBroker()));
@@ -474,16 +481,22 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
return
CompletableFuture.completedFuture(Optional.of(data.sourceBroker()));
}
case Assigning, Releasing -> {
- return deferGetOwnerRequest(serviceUnit).thenApply(
+ return deferGetOwnerRequest(serviceUnit).whenComplete((__, e)
-> {
+ if (e != null) {
+
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
+ }
+ }).thenApply(
broker -> broker == null ? Optional.empty() :
Optional.of(broker));
}
case Init, Free -> {
return CompletableFuture.completedFuture(Optional.empty());
}
case Deleted -> {
+ ownerLookUpCounters.get(state).getFailure().incrementAndGet();
return CompletableFuture.failedFuture(new
IllegalArgumentException(serviceUnit + " is deleted."));
}
default -> {
+ ownerLookUpCounters.get(state).getFailure().incrementAndGet();
String errorMsg = String.format("Failed to process service
unit state data: %s when get owner.", data);
log.error(errorMsg);
return CompletableFuture.failedFuture(new
IllegalStateException(errorMsg));
@@ -522,6 +535,23 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
return getOwnerRequest;
}
+ private CompletableFuture<Void> publishOverrideEventAsync(String
serviceUnit,
+ ServiceUnitStateData orphanData,
+ ServiceUnitStateData override) {
+ if (!validateChannelState(Started, true)) {
+ throw new IllegalStateException("Invalid channel state:" +
channelState.name());
+ }
+ EventType eventType = EventType.Override;
+ eventCounters.get(eventType).getTotal().incrementAndGet();
+ return pubAsync(serviceUnit, override).whenComplete((__, e) -> {
+ if (e != null) {
+ eventCounters.get(eventType).getFailure().incrementAndGet();
+ log.error("Failed to override serviceUnit:{} from
orphanData:{} to overrideData:{}",
+ serviceUnit, orphanData, override, e);
+ }
+ }).thenApply(__ -> null);
+ }
+
public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
@@ -658,6 +688,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
stateChangeListeners.notify(serviceUnit, data, null);
if (isTargetBroker(data.dstBroker())) {
log(null, serviceUnit, data, null);
+ lastOwnEventHandledAt = System.currentTimeMillis();
}
}
@@ -1022,33 +1053,34 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
broker, delayInSecs, cleanupJobs.size());
}
- private void overrideOwnership(String serviceUnit, ServiceUnitStateData
orphanData, Set<String> availableBrokers) {
-
- Optional<String> selectedBroker =
brokerSelector.select(availableBrokers, null, getContext());
+ private void overrideOwnership(String serviceUnit, ServiceUnitStateData
orphanData) {
+ Optional<String> selectedBroker = selectBroker(serviceUnit);
if (selectedBroker.isPresent()) {
- var override = new ServiceUnitStateData(Owned,
selectedBroker.get(), true, getNextVersionId(orphanData));
+ var override =
+ new ServiceUnitStateData(Owned, selectedBroker.get(),
true, getNextVersionId(orphanData));
log.info("Overriding ownership serviceUnit:{} from orphanData:{}
to overrideData:{}",
serviceUnit, orphanData, override);
- pubAsync(serviceUnit, override).whenComplete((__, e) -> {
- if (e != null) {
- log.error("Failed to override serviceUnit:{} from
orphanData:{} to overrideData:{}",
- serviceUnit, orphanData, override, e);
- }
- });
+ publishOverrideEventAsync(serviceUnit, orphanData, override)
+ .exceptionally(e -> {
+ log.error(
+ "Failed to override the ownership
serviceUnit:{} orphanData:{}. "
+ + "Failed to publish override event.
totalCleanupErrorCnt:{}",
+ serviceUnit, orphanData,
totalCleanupErrorCnt.incrementAndGet());
+ return null;
+ });
} else {
- log.error("Failed to override the ownership serviceUnit:{}
orphanData:{}. Empty selected broker.",
- serviceUnit, orphanData);
+ log.error("Failed to override the ownership serviceUnit:{}
orphanData:{}. Empty selected broker. "
+ + "totalCleanupErrorCnt:{}",
+ serviceUnit, orphanData,
totalCleanupErrorCnt.incrementAndGet());
}
}
- private void doCleanup(String broker) throws ExecutionException,
InterruptedException, TimeoutException {
+ private void doCleanup(String broker) {
long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}",
broker);
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
- var availableBrokers = new
HashSet<>(brokerRegistry.getAvailableBrokersAsync()
- .get(inFlightStateWaitingTimeInMillis, MILLISECONDS));
for (var etr : tableview.entrySet()) {
var stateData = etr.getValue();
@@ -1056,13 +1088,13 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
var state = state(stateData);
if (StringUtils.equals(broker, stateData.dstBroker())) {
if (isActiveState(state)) {
- overrideOwnership(serviceUnit, stateData,
availableBrokers);
+ overrideOwnership(serviceUnit, stateData);
orphanServiceUnitCleanupCnt++;
}
} else if (StringUtils.equals(broker, stateData.sourceBroker())) {
if (isInFlightState(state)) {
- overrideOwnership(serviceUnit, stateData,
availableBrokers);
+ overrideOwnership(serviceUnit, stateData);
orphanServiceUnitCleanupCnt++;
}
}
@@ -1097,31 +1129,40 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
- private Optional<ServiceUnitStateData> getRollForwardStateData(
- Set<String> availableBrokers, LoadManagerContext context, long
nextVersionId) {
- Optional<String> selectedBroker =
brokerSelector.select(availableBrokers, null, context);
+ private Optional<String> selectBroker(String serviceUnit) {
+ try {
+ return loadManager.selectAsync(getNamespaceBundle(serviceUnit))
+ .get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
+ } catch (Throwable e) {
+ log.error("Failed to select a broker for serviceUnit:{}",
serviceUnit);
+ }
+ return Optional.empty();
+ }
+
+ private Optional<ServiceUnitStateData> getRollForwardStateData(String
serviceUnit, long nextVersionId) {
+ Optional<String> selectedBroker = selectBroker(serviceUnit);
if (selectedBroker.isEmpty()) {
return Optional.empty();
}
return Optional.of(new ServiceUnitStateData(Owned,
selectedBroker.get(), true, nextVersionId));
}
+
private Optional<ServiceUnitStateData> getOverrideInFlightStateData(
String serviceUnit, ServiceUnitStateData orphanData,
- Set<String> availableBrokers,
- LoadManagerContext context) {
+ Set<String> availableBrokers) {
long nextVersionId = getNextVersionId(orphanData);
var state = orphanData.state();
switch (state) {
case Assigning: {
- return getRollForwardStateData(availableBrokers, context,
nextVersionId);
+ return getRollForwardStateData(serviceUnit, nextVersionId);
}
case Splitting, Releasing: {
if (availableBrokers.contains(orphanData.sourceBroker())) {
// rollback to the src
return Optional.of(new ServiceUnitStateData(Owned,
orphanData.sourceBroker(), true, nextVersionId));
} else {
- return getRollForwardStateData(availableBrokers, context,
nextVersionId);
+ return getRollForwardStateData(serviceUnit, nextVersionId);
}
}
default: {
@@ -1202,25 +1243,27 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
handleBrokerDeletionEvent(inactiveBroker);
}
} else if (!orphanServiceUnits.isEmpty()) {
- var context = getContext();
for (var etr : orphanServiceUnits.entrySet()) {
var orphanServiceUnit = etr.getKey();
var orphanData = etr.getValue();
var overrideData = getOverrideInFlightStateData(
- orphanServiceUnit, orphanData, activeBrokers, context);
+ orphanServiceUnit, orphanData, activeBrokers);
if (overrideData.isPresent()) {
- pubAsync(orphanServiceUnit,
overrideData.get()).whenComplete((__, e) -> {
- if (e != null) {
- log.error("Failed cleaning the ownership
orphanServiceUnit:{}, orphanData:{}, "
- + "cleanupErrorCnt:{}.",
- orphanServiceUnit, orphanData,
- totalCleanupErrorCnt.incrementAndGet() -
totalCleanupErrorCntStart, e);
- }
- });
+ publishOverrideEventAsync(orphanServiceUnit, orphanData,
overrideData.get())
+ .whenComplete((__, e) -> {
+ if (e != null) {
+ log.error("Failed cleaning the ownership
orphanServiceUnit:{}, orphanData:{}, "
+ + "cleanupErrorCnt:{}.",
+ orphanServiceUnit, orphanData,
+
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
+ }
+ });
orphanServiceUnitCleanupCnt++;
} else {
- log.warn("Failed get the overrideStateData from
orphanServiceUnit:{}, orphanData:{}. will retry..",
- orphanServiceUnit, orphanData);
+ log.warn("Failed get the overrideStateData from
orphanServiceUnit:{}, orphanData:{},"
+ + " cleanupErrorCnt:{}. will retry..",
+ orphanServiceUnit, orphanData,
+ totalCleanupErrorCnt.incrementAndGet() -
totalCleanupErrorCntStart);
}
}
}
@@ -1279,6 +1322,25 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
);
}
+ private int getTotalOwnedServiceUnitCnt() {
+ if (tableview == null) {
+ return 0;
+ }
+ long now = System.currentTimeMillis();
+ if (lastOwnEventHandledAt > lastOwnedServiceUnitCountAt
+ || now - lastOwnedServiceUnitCountAt >
MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS) {
+ int cnt = 0;
+ for (var data : tableview.values()) {
+ if (data.state() == Owned && isTargetBroker(data.dstBroker()))
{
+ cnt++;
+ }
+ }
+ lastOwnedServiceUnitCountAt = now;
+ totalOwnedServiceUnitCnt = cnt;
+ }
+ return totalOwnedServiceUnitCnt;
+ }
+
@Override
public List<Metrics> getMetrics() {
@@ -1288,11 +1350,25 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
dimensions.put("broker", pulsar.getAdvertisedAddress());
for (var etr : ownerLookUpCounters.entrySet()) {
- var dim = new HashMap<>(dimensions);
- dim.put("state", etr.getKey().toString());
- var metric = Metrics.create(dim);
- metric.put("brk_sunit_state_chn_owner_lookup_total",
etr.getValue());
- metrics.add(metric);
+ {
+ var dim = new HashMap<>(dimensions);
+ dim.put("state", etr.getKey().toString());
+ dim.put("result", "Total");
+ var metric = Metrics.create(dim);
+ metric.put("brk_sunit_state_chn_owner_lookup_total",
+ etr.getValue().getTotal().get());
+ metrics.add(metric);
+ }
+
+ {
+ var dim = new HashMap<>(dimensions);
+ dim.put("state", etr.getKey().toString());
+ dim.put("result", "Failure");
+ var metric = Metrics.create(dim);
+ metric.put("brk_sunit_state_chn_owner_lookup_total",
+ etr.getValue().getFailure().get());
+ metrics.add(metric);
+ }
}
for (var etr : eventCounters.entrySet()) {
@@ -1371,10 +1447,19 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
metrics.add(metric);
}
+ {
+ var dim = new HashMap<>(dimensions);
+ dim.put("result", "Success");
+ var metric = Metrics.create(dim);
+
metric.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total",
totalInactiveBrokerCleanupCnt);
+ metrics.add(metric);
+ }
+
var metric = Metrics.create(dimensions);
- metric.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total",
totalInactiveBrokerCleanupCnt);
+
metric.put("brk_sunit_state_chn_orphan_su_cleanup_ops_total",
totalOrphanServiceUnitCleanupCnt);
metric.put("brk_sunit_state_chn_su_tombstone_cleanup_ops_total",
totalServiceUnitTombstoneCleanupCnt);
+ metric.put("brk_sunit_state_chn_owned_su_total",
getTotalOwnedServiceUnitCnt());
metrics.add(metric);
return metrics;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/AssignCounter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/AssignCounter.java
index 26ff1f5f401..8e19b7b45e2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/AssignCounter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/AssignCounter.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.models;
-import static
org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter.Label.Empty;
+import static
org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter.Label.Failure;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter.Label.Skip;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter.Label.Success;
import java.util.ArrayList;
@@ -35,7 +35,7 @@ public class AssignCounter {
enum Label {
Success,
- Empty,
+ Failure,
Skip,
}
@@ -44,7 +44,7 @@ public class AssignCounter {
public AssignCounter() {
breakdownCounters = Map.of(
Success, new AtomicLong(),
- Empty, new AtomicLong(),
+ Failure, new AtomicLong(),
Skip, new AtomicLong()
);
}
@@ -54,8 +54,8 @@ public class AssignCounter {
breakdownCounters.get(Success).incrementAndGet();
}
- public void incrementEmpty() {
- breakdownCounters.get(Empty).incrementAndGet();
+ public void incrementFailure() {
+ breakdownCounters.get(Failure).incrementAndGet();
}
public void incrementSkip() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
index 15bfdc747f1..bd8b3aa6654 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
@@ -98,9 +98,8 @@ public class DefaultNamespaceBundleSplitStrategyImpl
implements NamespaceBundleS
if (!channel.isOwner(bundle)) {
if (debug) {
- log.error(String.format(CANNOT_SPLIT_BUNDLE_MSG
+ log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG
+ " This broker is not the owner.", bundle));
- counter.update(Failure, Unknown);
}
continue;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 502e02e465e..a9b145e3029 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -37,6 +37,7 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -59,7 +60,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import java.net.URL;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -81,13 +81,11 @@ import
org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
+import
org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
-import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.PulsarResources;
-import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.TableViewImpl;
@@ -97,11 +95,8 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.stats.Metrics;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.testcontainers.shaded.org.awaitility.Awaitility;
@@ -122,8 +117,6 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
private PulsarTestContext additionalPulsarTestContext;
- private PulsarResources resources;
-
private ExtensibleLoadManagerImpl primaryLoadManager;
private ExtensibleLoadManagerImpl secondaryLoadManager;
@@ -164,37 +157,6 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
Sets.newHashSet(this.conf.getClusterName()));
}
- protected void beforePulsarStart(PulsarService pulsar) throws Exception {
- if (resources == null) {
- MetadataStoreExtended localStore =
pulsar.createLocalMetadataStore(null);
- MetadataStoreExtended configStore = (MetadataStoreExtended)
pulsar.createConfigurationMetadataStore(null);
- resources = new PulsarResources(localStore, configStore);
- }
- this.createNamespaceIfNotExists(resources,
NamespaceName.SYSTEM_NAMESPACE.getTenant(),
- NamespaceName.SYSTEM_NAMESPACE);
- }
-
- protected void createNamespaceIfNotExists(PulsarResources resources,
- String publicTenant,
- NamespaceName ns) throws
Exception {
- TenantResources tr = resources.getTenantResources();
- NamespaceResources nsr = resources.getNamespaceResources();
-
- if (!tr.tenantExists(publicTenant)) {
- tr.createTenant(publicTenant,
- TenantInfo.builder()
-
.adminRoles(Sets.newHashSet(conf.getSuperUserRoles()))
-
.allowedClusters(Sets.newHashSet(conf.getClusterName()))
- .build());
- }
-
- if (!nsr.namespaceExists(ns)) {
- Policies nsp = new Policies();
- nsp.replication_clusters =
Collections.singleton(conf.getClusterName());
- nsr.createPolicies(ns, nsp);
- }
- }
-
@Override
@AfterClass
protected void cleanup() throws Exception {
@@ -635,8 +597,8 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
@Test
public void testGetMetrics() throws Exception {
{
- var brokerLoadMetrics = (AtomicReference<List<Metrics>>)
- FieldUtils.readDeclaredField(primaryLoadManager,
"brokerLoadMetrics", true);
+ var brokerLoadDataReporter = mock(BrokerLoadDataReporter.class);
+ FieldUtils.writeDeclaredField(primaryLoadManager,
"brokerLoadDataReporter", brokerLoadDataReporter, true);
BrokerLoadData loadData = new BrokerLoadData();
SystemResourceUsage usage = new SystemResourceUsage();
var cpu = new ResourceUsage(1.0, 100.0);
@@ -650,7 +612,7 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
usage.setBandwidthIn(bandwidthIn);
usage.setBandwidthOut(bandwidthOut);
loadData.update(usage, 1, 2, 3, 4, 5, 6, conf);
-
brokerLoadMetrics.set(loadData.toMetrics(pulsar.getAdvertisedAddress()));
+ doReturn(loadData).when(brokerLoadDataReporter).generateLoadData();
}
{
var unloadMetrics = (AtomicReference<List<Metrics>>)
@@ -700,8 +662,8 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
{
AssignCounter assignCounter = new AssignCounter();
assignCounter.incrementSuccess();
- assignCounter.incrementEmpty();
- assignCounter.incrementEmpty();
+ assignCounter.incrementFailure();
+ assignCounter.incrementFailure();
assignCounter.incrementSkip();
assignCounter.incrementSkip();
assignCounter.incrementSkip();
@@ -709,7 +671,8 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
}
{
-
+ FieldUtils.writeDeclaredField(channel1,
"lastOwnedServiceUnitCountAt", System.currentTimeMillis(), true);
+ FieldUtils.writeDeclaredField(channel1,
"totalOwnedServiceUnitCnt", 10, true);
FieldUtils.writeDeclaredField(channel1,
"totalInactiveBrokerCleanupCnt", 1, true);
FieldUtils.writeDeclaredField(channel1,
"totalServiceUnitTombstoneCleanupCnt", 2, true);
FieldUtils.writeDeclaredField(channel1,
"totalOrphanServiceUnitCleanupCnt", 3, true);
@@ -718,21 +681,21 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
FieldUtils.writeDeclaredField(channel1,
"totalInactiveBrokerCleanupIgnoredCnt", 6, true);
FieldUtils.writeDeclaredField(channel1,
"totalInactiveBrokerCleanupCancelledCnt", 7, true);
- Map<ServiceUnitState, AtomicLong> ownerLookUpCounters = new
LinkedHashMap<>();
+ Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>
ownerLookUpCounters = new LinkedHashMap<>();
Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>
handlerCounters = new LinkedHashMap<>();
Map<ServiceUnitStateChannelImpl.EventType,
ServiceUnitStateChannelImpl.Counters> eventCounters =
new LinkedHashMap<>();
- int i = 1;
int j = 0;
for (var state : ServiceUnitState.values()) {
- ownerLookUpCounters.put(state, new AtomicLong(i));
+ ownerLookUpCounters.put(state,
+ new ServiceUnitStateChannelImpl.Counters(
+ new AtomicLong(j + 1), new AtomicLong(j + 2)));
handlerCounters.put(state,
new ServiceUnitStateChannelImpl.Counters(
new AtomicLong(j + 1), new AtomicLong(j + 2)));
- i++;
j += 2;
}
- i = 0;
+ int i = 0;
for (var type : ServiceUnitStateChannelImpl.EventType.values()) {
eventCounters.put(type,
new ServiceUnitStateChannelImpl.Counters(
@@ -769,22 +732,31 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
dimensions=[{broker=localhost, metric=bundlesSplit,
reason=Bandwidth, result=Success}],
metrics=[{brk_lb_bundles_split_breakdown_total=4}]
dimensions=[{broker=localhost, metric=bundlesSplit,
reason=Admin, result=Success}],
metrics=[{brk_lb_bundles_split_breakdown_total=5}]
dimensions=[{broker=localhost, metric=bundlesSplit,
reason=Unknown, result=Failure}],
metrics=[{brk_lb_bundles_split_breakdown_total=6}]
- dimensions=[{broker=localhost, metric=assign,
result=Empty}], metrics=[{brk_lb_assign_broker_breakdown_total=2}]
+ dimensions=[{broker=localhost, metric=assign,
result=Failure}], metrics=[{brk_lb_assign_broker_breakdown_total=2}]
dimensions=[{broker=localhost, metric=assign,
result=Skip}], metrics=[{brk_lb_assign_broker_breakdown_total=3}]
dimensions=[{broker=localhost, metric=assign,
result=Success}], metrics=[{brk_lb_assign_broker_breakdown_total=1}]
- dimensions=[{broker=localhost, metric=sunitStateChn,
state=Init}], metrics=[{brk_sunit_state_chn_owner_lookup_total=1}]
- dimensions=[{broker=localhost, metric=sunitStateChn,
state=Free}], metrics=[{brk_sunit_state_chn_owner_lookup_total=2}]
- dimensions=[{broker=localhost, metric=sunitStateChn,
state=Owned}], metrics=[{brk_sunit_state_chn_owner_lookup_total=3}]
- dimensions=[{broker=localhost, metric=sunitStateChn,
state=Assigning}], metrics=[{brk_sunit_state_chn_owner_lookup_total=4}]
- dimensions=[{broker=localhost, metric=sunitStateChn,
state=Releasing}], metrics=[{brk_sunit_state_chn_owner_lookup_total=5}]
- dimensions=[{broker=localhost, metric=sunitStateChn,
state=Splitting}], metrics=[{brk_sunit_state_chn_owner_lookup_total=6}]
- dimensions=[{broker=localhost, metric=sunitStateChn,
state=Deleted}], metrics=[{brk_sunit_state_chn_owner_lookup_total=7}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Total, state=Init}], metrics=[{brk_sunit_state_chn_owner_lookup_total=1}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Failure, state=Init}],
metrics=[{brk_sunit_state_chn_owner_lookup_total=2}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Total, state=Free}], metrics=[{brk_sunit_state_chn_owner_lookup_total=3}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Failure, state=Free}],
metrics=[{brk_sunit_state_chn_owner_lookup_total=4}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Total, state=Owned}],
metrics=[{brk_sunit_state_chn_owner_lookup_total=5}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Failure, state=Owned}],
metrics=[{brk_sunit_state_chn_owner_lookup_total=6}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Total, state=Assigning}],
metrics=[{brk_sunit_state_chn_owner_lookup_total=7}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Failure, state=Assigning}],
metrics=[{brk_sunit_state_chn_owner_lookup_total=8}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Total, state=Releasing}],
metrics=[{brk_sunit_state_chn_owner_lookup_total=9}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Failure, state=Releasing}],
metrics=[{brk_sunit_state_chn_owner_lookup_total=10}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Total, state=Splitting}],
metrics=[{brk_sunit_state_chn_owner_lookup_total=11}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Failure, state=Splitting}],
metrics=[{brk_sunit_state_chn_owner_lookup_total=12}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Total, state=Deleted}],
metrics=[{brk_sunit_state_chn_owner_lookup_total=13}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Failure, state=Deleted}],
metrics=[{brk_sunit_state_chn_owner_lookup_total=14}]
dimensions=[{broker=localhost, event=Assign,
metric=sunitStateChn, result=Total}],
metrics=[{brk_sunit_state_chn_event_publish_ops_total=1}]
dimensions=[{broker=localhost, event=Assign,
metric=sunitStateChn, result=Failure}],
metrics=[{brk_sunit_state_chn_event_publish_ops_total=2}]
dimensions=[{broker=localhost, event=Split,
metric=sunitStateChn, result=Total}],
metrics=[{brk_sunit_state_chn_event_publish_ops_total=3}]
dimensions=[{broker=localhost, event=Split,
metric=sunitStateChn, result=Failure}],
metrics=[{brk_sunit_state_chn_event_publish_ops_total=4}]
dimensions=[{broker=localhost, event=Unload,
metric=sunitStateChn, result=Total}],
metrics=[{brk_sunit_state_chn_event_publish_ops_total=5}]
dimensions=[{broker=localhost, event=Unload,
metric=sunitStateChn, result=Failure}],
metrics=[{brk_sunit_state_chn_event_publish_ops_total=6}]
+ dimensions=[{broker=localhost, event=Override,
metric=sunitStateChn, result=Total}],
metrics=[{brk_sunit_state_chn_event_publish_ops_total=7}]
+ dimensions=[{broker=localhost, event=Override,
metric=sunitStateChn, result=Failure}],
metrics=[{brk_sunit_state_chn_event_publish_ops_total=8}]
dimensions=[{broker=localhost, event=Init,
metric=sunitStateChn, result=Total}],
metrics=[{brk_sunit_state_chn_subscribe_ops_total=1}]
dimensions=[{broker=localhost, event=Init,
metric=sunitStateChn, result=Failure}],
metrics=[{brk_sunit_state_chn_subscribe_ops_total=2}]
dimensions=[{broker=localhost, event=Free,
metric=sunitStateChn, result=Total}],
metrics=[{brk_sunit_state_chn_subscribe_ops_total=3}]
@@ -803,7 +775,8 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
dimensions=[{broker=localhost, metric=sunitStateChn,
result=Skip}],
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=6}]
dimensions=[{broker=localhost, metric=sunitStateChn,
result=Cancel}],
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=7}]
dimensions=[{broker=localhost, metric=sunitStateChn,
result=Schedule}],
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=5}]
- dimensions=[{broker=localhost, metric=sunitStateChn}],
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=1,
brk_sunit_state_chn_orphan_su_cleanup_ops_total=3,
brk_sunit_state_chn_su_tombstone_cleanup_ops_total=2}]
+ dimensions=[{broker=localhost, metric=sunitStateChn,
result=Success}],
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=1}]
+ dimensions=[{broker=localhost, metric=sunitStateChn}],
metrics=[{brk_sunit_state_chn_orphan_su_cleanup_ops_total=3,
brk_sunit_state_chn_owned_su_total=10,
brk_sunit_state_chn_su_tombstone_cleanup_ops_total=2}]
""".split("\n"));
var actual = primaryLoadManager.getMetrics().stream().map(m ->
m.toString()).collect(Collectors.toSet());
assertEquals(actual, expected);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 9eda98e5d84..2d98bc5cae6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -75,11 +75,11 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistryImpl;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
-import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Producer;
@@ -116,7 +116,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
private BrokerRegistryImpl registry;
- private BrokerSelectionStrategy brokerSelector;
+ private ExtensibleLoadManagerImpl loadManager;
@BeforeClass
@Override
@@ -135,7 +135,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
loadManagerContext = mock(LoadManagerContext.class);
doReturn(mock(LoadDataStore.class)).when(loadManagerContext).brokerLoadDataStore();
doReturn(mock(LoadDataStore.class)).when(loadManagerContext).topBundleLoadDataStore();
- brokerSelector = mock(BrokerSelectionStrategy.class);
+ loadManager = mock(ExtensibleLoadManagerImpl.class);
additionalPulsarTestContext =
createAdditionalPulsarTestContext(getDefaultConf());
pulsar2 = additionalPulsarTestContext.getPulsarService();
@@ -496,7 +496,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertEquals(0, getOwnerRequests2.size());
// recovered, check the monitor update state : Assigned -> Owned
-
doReturn(Optional.of(lookupServiceAddress1)).when(brokerSelector).select(any(),
any(), any());
+
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1))).when(loadManager).selectAsync(any());
FieldUtils.writeDeclaredField(channel2, "producer", producer, true);
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 1 , true);
@@ -714,7 +714,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var owner1 = channel1.getOwnerAsync(bundle1);
var owner2 = channel2.getOwnerAsync(bundle2);
-
doReturn(Optional.of(lookupServiceAddress2)).when(brokerSelector).select(any(),
any(), any());
+
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))).when(loadManager).selectAsync(any());
assertTrue(owner1.get().isEmpty());
assertTrue(owner2.get().isEmpty());
@@ -1076,7 +1076,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
FieldUtils.writeDeclaredField(channel2,
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
-
doReturn(Optional.of(lookupServiceAddress2)).when(brokerSelector).select(any(),
any(), any());
+
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))).when(loadManager).selectAsync(any());
channel1.publishAssignEventAsync(bundle, lookupServiceAddress2);
// channel1 is broken. the assign won't be complete.
waitUntilState(channel1, bundle);
@@ -1418,11 +1418,12 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
}
var ownerLookUpCounters =
- (Map<ServiceUnitStateChannelImpl.EventType, AtomicLong>)
+ (Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
FieldUtils.readDeclaredField(channel,
"ownerLookUpCounters", true);
for(var val : ownerLookUpCounters.values()){
- val.set(0);
+ val.getFailure().set(0);
+ val.getTotal().set(0);
}
}
@@ -1518,20 +1519,20 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
)
throws IllegalAccessException {
var ownerLookUpCounters =
- (Map<ServiceUnitState, AtomicLong>)
+ (Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
FieldUtils.readDeclaredField(channel,
"ownerLookUpCounters", true);
Awaitility.await()
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> { // wait until true
- assertEquals(assigned,
ownerLookUpCounters.get(Assigning).get());
- assertEquals(owned, ownerLookUpCounters.get(Owned).get());
- assertEquals(released,
ownerLookUpCounters.get(Releasing).get());
- assertEquals(splitting,
ownerLookUpCounters.get(Splitting).get());
- assertEquals(free, ownerLookUpCounters.get(Free).get());
- assertEquals(deleted,
ownerLookUpCounters.get(Deleted).get());
- assertEquals(init, ownerLookUpCounters.get(Init).get());
+ assertEquals(assigned,
ownerLookUpCounters.get(Assigning).getTotal().get());
+ assertEquals(owned,
ownerLookUpCounters.get(Owned).getTotal().get());
+ assertEquals(released,
ownerLookUpCounters.get(Releasing).getTotal().get());
+ assertEquals(splitting,
ownerLookUpCounters.get(Splitting).getTotal().get());
+ assertEquals(free,
ownerLookUpCounters.get(Free).getTotal().get());
+ assertEquals(deleted,
ownerLookUpCounters.get(Deleted).getTotal().get());
+ assertEquals(init,
ownerLookUpCounters.get(Init).getTotal().get());
});
}
@@ -1565,7 +1566,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
doReturn(loadManagerContext).when(channel).getContext();
doReturn(registry).when(channel).getBrokerRegistry();
- doReturn(brokerSelector).when(channel).getBrokerSelector();
+ doReturn(loadManager).when(channel).getLoadManager();
var leaderElectionService = new LeaderElectionService(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
index 0aa055b58ac..8c765e7a3df 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
@@ -169,7 +169,7 @@ public class DefaultNamespaceBundleSplitStrategyTest {
var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
var expected = Set.of();
assertEquals(actual, expected);
- verify(counter, times(2)).update(eq(SplitDecision.Label.Failure),
eq(Unknown));
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure),
eq(Unknown));
}
public void testError() throws Exception {