heesung-sn opened a new issue, #16691:
URL: https://github.com/apache/pulsar/issues/16691
<!---
Instructions for creating a PIP using this issue template:
1. The author(s) of the proposal will create a GitHub issue ticket using
this template.
(Optionally, it can be helpful to send a note discussing the proposal to
[email protected] mailing list before submitting this GitHub issue.
This discussion can
help developers gauge interest in the proposed changes before
formalizing the proposal.)
2. The author(s) will send a note to the [email protected] mailing list
to start the discussion, using subject prefix `[PIP] xxx`. To determine
the appropriate PIP
number `xxx`, inspect the mailing list
(https://lists.apache.org/[email protected])
for the most recent PIP. Add 1 to that PIP's number to get your PIP's
number.
3. Based on the discussion and feedback, some changes might be applied by
the author(s) to the text of the proposal.
4. Once some consensus is reached, there will be a vote to formally approve
the proposal. The vote will be held on the [email protected]
mailing list. Everyone
is welcome to vote on the proposal, though it will considered to be
binding
only the vote of PMC members. It will be required to have a lazy
majority of
at least 3 binding +1s votes. The vote should stay open for at least 48
hours.
5. When the vote is closed, if the outcome is positive, the state of the
proposal is updated and the Pull Requests associated with this proposal
can
start to get merged into the master branch.
-->
# Proposal: New Pulsar Broker Load Balancer
## Motivation
As previously shared with the community, we observed many improvement areas
around the Pulsar load
balancer[[1]](https://docs.google.com/document/d/1nXaiSK39E10awqinnDUX5Sial79FSTczFCmMdc9q8o8/edit#heading=h.3mmoccnu55o8).
Since the improvement requires significant changes, first, we would like to
share the overall goals for this project and the high-level components to
design. This doc will highlight the architecture of the new broker load
balancer.
## Goals
We set up the project goals in the following areas.
### User-facing goals
#### Logic
- Balance cluster utilization as uniform as possible with minimal delays
#### Logs / Metrics
- Show transparent load balance decisions with logs and metrics.
#### Admin API / Configurations
- Provide ways to override system decisions.
- Reduce the number of configurations to tune.
- Provide the better configuration default values with explanations.
- (second-phase) Provide ways to set custom load balance strategy.
### Internal Implementation goals
#### Logic
- Keep the three major load balance logics but make them more efficient and
faster. We will discuss detailed algorithm improvements separately.
- Topic(Bundle)-broker-assignment: improve the randomization and
assignment distribution
- Bundle-split: revisit the current threshold-based strategy
- Bundle-unload: revisit the frequency of unloading
#### Implementation
- Distribute bundle-broker assignment and bundle-split decisions to local
brokers.
- Synchronize bundle unloading decisions by the leader broker.
- Reduce load data replication among brokers.
- Replace load data’s metadata stores with topic table-views.
- Remove the client dependency in the load balance logic.
- Remove the client redirection in the assignment logic.
- Add the bundle transfer option in the unload logic instead of relying
on clients’ broker discovery calls
- Minimize the topic unavailability from bundle unloading with the bundle
transfer option
- Introduce the bundle state channel(table-view) to make bundle load balance
operations consistent and fault-tolerant among brokers.
- Isolate the new load balancer code in new classes.
- Replace the bundle ownership metadata store(ZK znodes) with the bundle
state channel.
#### Logs / Metrics
- Add meaningful logs and metrics for all major load balance events.
- Add documentation about how to read load balancer metrics and logs.
#### Admin API / Configurations
- Add an admin CLI to transfer a bundle to a specific broker.
- Add necessary configurations to override the load balance decisions.
- Dynamically adjust internal configuration thresholds based on the load
data.
- Make the admin APIs fault-tolerant and easy to monitor.
#### Testing
- Document the testing plan and coverage status.
- Add unit tests for happy and unhappy cases.
- Add global load balance logic tests, and compare the current load manager
with the new load manager.
## API Changes
N/A
## Implementation (High-Level Components)
### New Load Manager
- It refactors the existing load balance logic with better modularity.
- It isolates the new code in the new classes without breaking the existing
logic.
- This new load manager will be disabled in the first releases until proven
stable.
### Load Data Models
##### LocalBrokerData: broker’s factual data
- e.g.) {webServiceUrl, pulsarServiceUrl, …}
- Persisted in MetadataStore(ZK)
#### BrokerLoadData: broker’s load data
- e.g.) {cpu, memory, io, msgIn/Out, ...}
- Published in BrokerLoadDataStore(TableView)
#### BundlesLoadData: bundle’s load data
- e.g.) { bundleName, msgIn/Out, ...}
- Cached in the local broker only
#### TopBundlesLoadData: top-n high-loaded bundle load data from the broker
- e.g.) {brokerUrl, high_load_bundles :[{bundleName, …}], …}
- Published in TopBundlesLoadDataStore(TableView)
### Load Data Write-Read Flow
#### LocalBrokerData
Write:
- Upon init, each broker stores LocalBrokerData in its ephemeral znode in
MetaDataStore(ZK) to monitor live brokers(same as now)
- The broker-level load data moved to BrokerLoadData
Read:
- All brokers check LocalBrokerData to confirm the list of live brokers.
#### BrokerLoadData
Write:
- Each broker periodically computes local load(BrokerLoadData) and publishes
it to BrokerLoadDataStore(TableView)(non-persistent)
- Because non-persistent TableView can often lose data, we will add a
TTL policy to tombstone old KVs in BrokerLoadDataStore.
Read:
- All brokers consume BrokerLoadDataStore
- With aggregated BrokerLoadData, all brokers perform bundle assignments
without going through the leader.
#### BundlesLoadData
Write:
- Each broker monitors the allocated bundles' load and stores them in the
local cache BundlesLoadData(In-memory-HashMap).
- BundlesLoadData will not be replicated to other brokers’ caches.
Read:
- Each broker locally reads BundlesLoadData and computes top n high load
bundles, TopBundlesLoadData.
- With the local BundlesLoadData, all brokers perform bundle splits without
going through the leader.
#### TopBundlesLoadData
Write:
- Each broker periodically compute TopBundlesLoadData and publishes it to
TopBundlesLoadDataStore(TableView)(non-persistent)
- We will add a TTL policy to tombstone old KVs in
TopBundlesLoadDataStore.
Read:
- Only the leader broker consumes TopBundlesLoadDataStore
- With the aggregated TopBundlesLoadData and BrokerLoadData, the leader
initiates bundle unload(transfer) operations.
#### Load Data Flow

### Major Modifications on Bundle Split, Unload, and Assignment Flow
- With the local BundlesLoadData, all brokers perform bundle splits without
going through the leader. By default, newly split bundles will be the target to
unload(transfer).
- With aggregated BrokerLoadData, all brokers perform bundle assignments
without going through the leader.
- With aggregated TopBundlesLoadData and BrokerLoadData, the leader makes
decisions to unload(transfer) bundles.
- We will add a new bundle unload option, transfer, which transfers bundles
from one broker to another.
- We will introduce a global channel(Bundle State Channel) to share
consistent/linearized bundle state changes with brokers.
### Bundle State Channel
This bundle state channel is a persistent topic table-view used as a WAL to
broadcast the total order of all bundle state changes in the cluster. All
brokers will asynchronously consume messages in this channel in the same order
and react to bundle state changes(sequential consistency). With the table-view
compaction, the bundle state channel will eventually materialize the current
bundle-broker ownership. Read operations on this channel can be deferred(e.g.,
clients’ topic lookup requests) in a few seconds, depending on the current
state of the bundle.
#### Bundle State Lifecycles
We define the following states and actions and linearize the bundle state
changes.
(This is a high-level design to explain the concept here. The final version
may differ.)

Bundle Actions
- Own: Own the bundle ownership
- The owner broker is selected by the local load manager.
- Transfer: Transfer the bundle ownership to the destination broker.
- The source broker internally disables the bundle ownership.
- The destination broker owns the bundle.
- Return: Return deferred client connections with the destination broker URL
- Close the connections if already being served
- Split: Split the target(parent) bundle into child bundles.
- Create: Create the child bundle entries in the channel, initially assigned
to the local broker.
- Discard: Discard the bundle entry in the channel(tombstone operation)
- Unload: Unload the bundle ownership from the owner broker
- Disable the bundle ownership
- Close client connections under the bundle
- Run the Discard action
Bundle States
- Assigned: assigned to a broker
- Assigning: in the process of assigning the ownership
- Splitting: in the process of splitting a bundle range.
- Unassigned: unassigned to any broker (removed from the channel)
*New client connections to the bundle are deferred(with timeouts) in the
Assigning state.
#### Bundle State Change Examples
The bundle state channel can be used like the followings.
##### Bundle Transfer Example
(State, Action) Sequence:
(Assigned, Transfer) => (Assigning, Return) => (Assigned,)
1. The leader finds target bundles from TopBundlesLoadData and initiates a
bundle unload(a transfer) by broadcasting the unload state change to the bundle
state channel, keyed by the bundleName.
e.g. {key:bundleName, value:{flow:transfer, action:transfer,
state:assigning, from:A, to:B}}}
2. All brokers will consume the state change message in the channel.
3. Upon consuming the message from the channel, if any state change involves
the local broker, the broker performs its role and updates the state back in
the channel to continue the state change. If there are conflicting state
changes with the ongoing one, ignore them.
4. Meanwhile, if other brokers(broker C) receive lookup requests for the
bundle, the client's connections will be deferred(with timeouts) until they
receive the “Return” action. When the “Return” action is broadcasted, all
brokers will return the pending connections with the owner broker’s URL. Also,
the existing connections from the source broker will be closed.
##### Bundle Split Example
(State, Action) Sequence:
(Assigned, Split) => (Splitting, Unload | Create) => {(Unassigned, ) |
(Assigned, ), (Assigned, )}
1. Each owner broker monitors local BundlesLoadData and initiates a bundle
split by broadcasting the transfer state change to the bundle state channel,
keyed by the bundleName.
e.g. {key:bundleName, value:{flow: split, action:split, state:
splitting, from: A, to: B, transfer: true}}}
2. Same as Bundle Transfer Example step 2.
3. Same as Bundle Transfer Example step 3.
a. After the “Split,” the owner broadcasts the children bundles’
ownership creation(state=assigned) and the parent bundle’s ownership
unload(empty message).
b. By default, the owner publishes a message to the TopBundlesLoadData
store asking the leader to unload(or transfer) the children bundles.
##### Bundle Assignment Example
(State, Action) Sequence:
(Unassigned, Own) => (Assigning, Return) => (Assigned,)
1. When requested by clients, the first connected brokers check if any
broker in the state channel owns the bundle. Return the owner broker URL if
found. Else, initiate a bundle assignment by broadcasting the assignment state
change.
e.g. {key:bundleName, value:{flow: assignment, action:own,
state:assigning, to: B}}}
2. Same as Bundle Transfer Example step 2.
3. Same as Bundle Transfer Example step 3.
4. Same as Bundle Transfer Example step 4.
#### Bundle-Broker Ownership State
Because the bundle state channel shows the current bundle-broker ownership,
we can remove the redundant bundle ownership store(ZK znodes). Each broker will
look up the bundle ownership channel to check which broker currently owns the
requested bundles or is in the ownership assignment/unload(transfer) process.
Besides, before return, the broker availability metadata store(LocalBrokerData
znode existence) could be checked to confirm the owner brokers' availability
further.
#### Bundle State Channel Owner Selection and Discovery
Bundle State Channel(BSC) is another topic, and because of its circular
dependency, we can't use the BundleStateChannel to find the owner broker of the
BSC topic. For example, when a cluster starts, each broker needs to initiate
BSC TopicLookUp(to find the owner broker) in order to consume the messages in
BSC. However, initially, each broker does not know which broker owns the BSC.
The ZK leader election can be a good option to break this circular
dependency, like the followings.
##### Channel Owner Selection
The cluster can use the ZK leader election to select the owner broker. If
the owner becomes unavailable, one of the followers will become the new owner.
We can elect the owner for each bundle state partition.
##### Channel Owner Discovery
Then, in brokers’ TopicLookUp logic, we will add a special case to return
the current leader(the elected BSC owner) for the BSC topics.
#### Conflict State Resolution(Race Conditions)
Without distributed locks, we can resolve conflicting state changes by a
conflict state resolution algorithm in an optimistic and eventual manner.
Brokers can take the first valid state change in the linearized view as the
winner state and ignore the later ones.
One caveat is that because the current table-view compaction takes only the
last ones as the result values, we need to introduce an internal compaction
algo for this channel to follow the conflict resolution algorithm(the first
valid state change as the result value).
```
Bundle State Conflict Resolution Algorithm Example
For each bundle:
// A[i] is a linearized bundle state change action at i, and
// S is the current bundle state after A[i-1],
// where the sequence number i monotonically increases.
for each A[i] and S:
// no arrows in the state diagram
If A[i] is invalid from S:
Reject A[i]
Else: Accept A[i]
```
For instance, let’s say for bundle x, there are two conflicting assignments
initiated. The linearized state change messages will be like the following.
(own, to:B), (own, to:A)
By the conflict resolution algorithm, the second state change (own, to:A)
will be ignored by all brokers(and by the compaction algorithm). Eventually,
the “return” message will be broadcasted by declaring that the owner is “B.”
(own, to:B), (own, to:A), (return, to:B)
Let’s take another example. Let’s say bundle x is already assigned to broker
B, but another broker initiates the “own” action(before consuming the “return”
action). This last “own” state change will be ignored since this action “own”
is invalid from the previous state “assigned.” (in the above state diagram,
there is no “own” action arrow from the “assigned” state.)
(own, to:B), (return, to:B), (own, to:A)
#### Failure Recovery
##### When a broker is down
When state change participants(brokers) are suddenly unavailable, the state
change could become an orphan, as the participants do not play the role. For
these orphan state changes, the leader broker will run orphan state clean-up
logic. For instance, the leader can add the bundle state clean-up logic in the
broker unavailability notification handler(znode watcher) in order to clean the
pending bundle state changes and ownerships from unavailable brokers. Also, to
make the clean-up logic further fault-tolerant, the leader broker will run the
clean-up function when it initializes. Additionally, we could make the leader
periodically call the clean-up in a separate monitor thread(we shouldn’t
redundantly call this cleanup too often).
##### When the entire ZK is down and comes back
Every broker will be notified when its ZK session undergoes the connection
issue. Then, the brokers will be in the "safe" mode, serving the existing
topics as-is, but not allowing the ZK-related operations. The leader won't run
the bundle cleanup, transfer, nor unload logic in this case when it knows ZK is
down.
When ZK comes back, each broker will know ZK sessions are re-established.
They will wait 2-3 mins for all brokers to complete the ZK hand-shaking. Then,
they will recover the bundle state table-view and return to the normal mode.
#### Bundle State and Load Data TableView Scalability
Expected read/write traffic:
Write: there will be relatively fewer messages from the write path with
occasional spikes
Read: the fan-out broadcast could cause bottlenecks when the cluster is
enormous.
This bundle state channel is relatively lightweight from the producers
because bundle state change is relatively less frequent. Still, message
dispatch to consumers could be heavier if the cluster is very large. The same
issue can happen to other table-views(BrokerLoadDataStorage) introduced in this
proposal. We could consider the following methods to scale the table views’
produce/consume rates in a large cluster.
##### Split Broker Cluster to multiple clusters
Simply, one can split a massive broker cluster into multiple clusters with
different endpoints. The bookkeeper and configuration layer can be shared among
the broker clusters.
##### Partitioned Table-View (short-term)
One can make the table views based on partitioned topics. Then, we can
distribute message load to multiple partition owner brokers.
##### Sharding (long-term)
As the conventional scalability method, one could shard the cluster to
multiple groups of brokers. Then, we can create a separate channel for each
shard of brokers. This means we need an additional discovery layer to map
topics to broker shards(also need to align with Namespace Isolation Policies)
We need to mention that this metadata sync scalability issue is not new in
Pulsar, as the current Pulsar uses n-replication. For instance, all brokers'
and all bundles' load metadata are replicated to all brokers via ZK watchers.
Currently, distributed ZK servers send znode watch notifications to its
clients(brokers). In this proposal, multiple table-view owner brokers(with
partitioned table-views) can dispatch metadata change messages to the
participants(brokers).
We think this metadata sync scalability is relatively low-priority, as only
a few customers run Pulsar clusters on such a large scale. We could ask the
customers first to split the cluster into multiple clusters and then enable
partitioned table views. It is not practical for a single cluster to have
thousands of brokers. However, we still want to ensure this design is
seamlessly extensible, as a two-way-door decision.
## Reject Alternatives
N/A
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]