HQebupt opened a new issue, #16274:
URL: https://github.com/apache/pulsar/issues/16274
## Motivation
The modular load manager, implemented in ModularLoadManagerImpl, is a
flexible alternative to the previously implemented load manager, which attempts
to simplify how load is managed while also providing abstractions so that
complex load management strategies may be implemented.
The load management component determines the criteria for unloading bundles
and contains the following load shedding strategy: OverloadShedder and
ThresholdShedder and UniformLoadShedder. (default is ThresholdShedder since
2.10.0)
- OverloadShedder: This strategy attempts to shed exactly one bundle on
brokers which are overloaded
- ThresholdShedder: This strategy unloads any broker that exceeds the
average resource utilization of all brokers by a configured threshold.
- UniformLoadShedder:This strategy tends to distribute load uniformly across
all brokers.
However, the bundle placement strategy contains only one:
`LeastLongTermMessageRate`, which selects a broker based on which one has the
least long term message rate.
The load management in our pulsar cluster use `ThresholdShedder` as load
shedding strategy, and use LeastLongTermMessageRate as bundle placement
strategy, which does not work well.
Some broker nodes have a high load when the traffic of some topics are
relatively large. The load shedding strategy will unload some bundles in any
broker that exceeds the average resource utilization of all brokers by a
configured threshold. And the bundles will be transferred to the next broker
node. However it causes the load of the next broker node exceed the average
resource utilization. Therefore, the load balancing will occur again on the
current broker node due to high load. Worse yet, this scenario keeps popping up.
The load shedding strategy configuration is as follows
```
# load shedding strategy, support OverloadShedder and ThresholdShedder,
default is OverloadShedder
loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
# The broker resource usage threshold.
# When the broker resource usage is greater than the pulsar cluster average
resource usage,
# the threshold shedder will be triggered to offload bundles from the broker.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBrokerThresholdShedderPercentage=10
# When calculating new resource usage, the history usage accounts for.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerHistoryResourcePercentage=0.9
# The BandWithIn usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwithInResourceWeight=1.0
# The BandWithOut usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwithOutResourceWeight=1.0
# The CPU usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerCPUResourceWeight=1.0
# The heap memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerMemoryResourceWeight=0.1
# The direct memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerDirectMemoryResourceWeight=0.1
# Bundle unload minimum throughput threshold (MB), avoiding bundle unload
frequently.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBundleUnloadMinThroughputThreshold=0.1
```
The following screenshots are the status of the cluster:
Problem 1. Load balancing took a long time 10 hours and over 400 times, and
it has been unloading if there is a large traffic.
<img width="1247" alt="image"
src="https://user-images.githubusercontent.com/4970972/176341641-b85f8258-e973-4b14-8875-16be573dcbda.png">
<img width="1245" alt="image"
src="https://user-images.githubusercontent.com/4970972/176341676-ed81b465-10fc-4051-8353-42e6306d4210.png">
Problem 2. The effect of cpu balancing is poor.
<img width="1247" alt="image"
src="https://user-images.githubusercontent.com/4970972/176341746-d3b28234-11ef-48c4-9f91-2fdf7bcde74b.png">
<img width="1246" alt="image"
src="https://user-images.githubusercontent.com/4970972/176341792-b77a0691-b402-4fa0-a7aa-ac15c890613a.png">
The load shedding strategy `ThresholdShedder` work well, but not the bundle
placement strategyLeastLongTermMessageRate .
There are 3 possible reasons for the problems.
1. Although the cluster has many brokers with low load, there are fewer
brokers to be considered for assignment.
<img width="1168" alt="image"
src="https://user-images.githubusercontent.com/4970972/176341873-6da69749-3c1d-49cf-9e83-b942a8327db0.png">
Some brokers with lower load but more bundles can not be candidate due to
distributing bundles evenly in LoadManager by force. Most of brokers are
filtered out by the strategy, only 1 or 2 brokers can be candidate in the total
136 brokers . It was fixed by #16059
2. The memory usage of Java programs fluctuates widely, so that the maximum
resource usage calculated is based on memory usage most of the time, which
filters out brokers with low CPU load. Below is the sample of two brokers jvm
memory usage in the cluster.
<img width="1249" alt="image"
src="https://user-images.githubusercontent.com/4970972/176342043-f88f875d-5479-4132-a3f1-f9c053f3b7cb.png">
If the broker is overload, it will get highest score, which prevents it from
being a candidate.
<img width="1059" alt="image"
src="https://user-images.githubusercontent.com/4970972/176342107-179489e9-40b3-47b9-8158-f0e30fc037e4.png">
3. The bundle placement strategy is LeastLongTermMessageRate, which selects
a broker based on which one has the least long term message rate instead of
load metric. The `LeastLongTermMessageRate` does not get along with
`ThresholdShedder` well. Therefore, a load-based bundle placement strategy is
necessary to cooperate with `ThresholdShedder`.
### Current implementation details
The `ThresholdShedder` strategy that unloads any broker that exceeds the
average resource utilization of all brokers by a configured threshold. As a
consequence, this strategy tends to distribute load among all brokers. It does
this by first computing the average resource usage per broker for the whole
cluster. The resource usage for each broker is calculated using the following
method: LocalBrokerData#getMaxResourceUsageWithWeight). The weights for each
resource are configurable. Historical observations are included in the running
average based on the broker's setting for
loadBalancerHistoryResourcePercentage. Once the average resource usage is
calculated, a broker's current/historical usage is compared to the average
broker usage. If a broker's usage is greater than the average usage per broker
plus the loadBalancerBrokerThresholdShedderPercentage, this load shedder
proposes removing enough bundles to bring the unloaded broker 5% below the
current average broker usage. Note that
recently unloaded bundles are not unloaded again.
## Goal
Develop a new load-based bundle placement strategy for better load balancing
with fewer times, and less time, which cab achieve better teamwork with
`ThresholdShedder`.
## API Changes
No user-facing API changes are required.
## Implementation
This should be a detailed description of all the changes that are
expected to be made. It should be detailed enough that any developer that is
familiar with Pulsar internals would be able to understand all the parts of
the
code changes for this proposal.
This should also serve as documentation for any person that is trying to
understand or debug the behavior of a certain feature.
The main idea of the new strategy is to unify the requirement of load
shedding strategy and bundle placement strategy, which consider the resource
usage with weight, including historical observations.
How to calculate a score for a broker ?
- use its historical load and short-term load data with weight.
How to select a broker for assignning bundle ?
- select a broker based on which one has the least resource usage with
weight.
### New configuration options
The existing cache implementation will not be removed at this point. Users
will
be able to configure the old implementation in broker.conf.
This option will be helpful in case of performance regressions would be seen
for
some use cases with the new strategy implementation.
```
# load assignment strategy, support LeastLongTermMessageRate and
LeastResourceUsageWithWeight, default is LeastLongTermMessageRate
loadBalancerLoadAssignmentStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight
```
Below are screenshots of the effect of the new strategy with less time and
fewer load balancing times.
<img width="1593" alt="image"
src="https://user-images.githubusercontent.com/4970972/176346492-f2ccdfda-b011-406d-88fe-df73d8bb839b.png">
<img width="1586" alt="image"
src="https://user-images.githubusercontent.com/4970972/176346531-63a9b8b0-ef7b-4f74-a904-37d7c07c1793.png">
## Reject Alternatives
None yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]