heesung-sn opened a new issue, #16691:
URL: https://github.com/apache/pulsar/issues/16691

   <!---
   Instructions for creating a PIP using this issue template:
   
    1. The author(s) of the proposal will create a GitHub issue ticket using 
this template.
       (Optionally, it can be helpful to send a note discussing the proposal to
       [email protected] mailing list before submitting this GitHub issue. 
This discussion can
       help developers gauge interest in the proposed changes before 
formalizing the proposal.)
    2. The author(s) will send a note to the [email protected] mailing list
       to start the discussion, using subject prefix `[PIP] xxx`. To determine 
the appropriate PIP
       number `xxx`, inspect the mailing list 
(https://lists.apache.org/[email protected])
       for the most recent PIP. Add 1 to that PIP's number to get your PIP's 
number.
    3. Based on the discussion and feedback, some changes might be applied by
       the author(s) to the text of the proposal.
    4. Once some consensus is reached, there will be a vote to formally approve
       the proposal. The vote will be held on the [email protected] 
mailing list. Everyone
       is welcome to vote on the proposal, though it will considered to be 
binding
       only the vote of PMC members. It will be required to have a lazy 
majority of
       at least 3 binding +1s votes. The vote should stay open for at least 48 
hours.
    5. When the vote is closed, if the outcome is positive, the state of the
       proposal is updated and the Pull Requests associated with this proposal 
can
       start to get merged into the master branch.
   
   -->
   
   # Proposal: New Pulsar Broker Load Balancer
   
   ## Motivation 
   As previously shared with the community, we observed many improvement areas 
around the Pulsar load 
balancer[[1]](https://docs.google.com/document/d/1nXaiSK39E10awqinnDUX5Sial79FSTczFCmMdc9q8o8/edit#heading=h.3mmoccnu55o8).
 Since the improvement requires significant changes, first, we would like to 
share the overall goals for this project and the high-level components to 
design. This doc will highlight the architecture of the new broker load 
balancer.
   
   ## Goals
   We set up the project goals in the following areas.
   
   ### User-facing goals
   
   #### Logic
   - Balance cluster utilization as uniform as possible with minimal delays
   
   #### Logs / Metrics
   - Show transparent load balance decisions with logs and metrics.
   
   #### Admin API / Configurations
   - Provide ways to override system decisions.
   - Reduce the number of configurations to tune.
   - Provide the better configuration default values with explanations.
   - (second-phase) Provide ways to set custom load balance strategy.
   
   ### Internal Implementation goals
   
   #### Logic
   - Keep the three major load balance logics but make them more efficient and 
faster. We will discuss detailed algorithm improvements separately.
       - Topic(Bundle)-broker-assignment: improve the randomization and 
assignment distribution
       - Bundle-split: revisit the current threshold-based strategy
       - Bundle-unload: revisit the frequency of unloading
   
   #### Implementation
   - Distribute bundle-broker assignment and bundle-split decisions to local 
brokers.
   - Synchronize bundle unloading decisions by the leader broker.
   - Reduce load data replication among brokers.
   - Replace load data’s metadata stores with topic table-views.
   - Remove the client dependency in the load balance logic.
       - Remove the client redirection in the assignment logic.
       - Add the bundle transfer option in the unload logic instead of relying 
on clients’ broker discovery calls 
   - Minimize the topic unavailability from bundle unloading with the bundle 
transfer option
   - Introduce the bundle state channel(table-view) to make bundle load balance 
operations consistent and fault-tolerant among brokers.
   - Isolate the new load balancer code in new classes.
   - Replace the bundle ownership metadata store(ZK znodes) with the bundle 
state channel.
   
   #### Logs / Metrics
   - Add meaningful logs and metrics for all major load balance events.
   - Add documentation about how to read load balancer metrics and logs.
   
   #### Admin API  / Configurations
   - Add an admin CLI to transfer a bundle to a specific broker.
   - Add necessary configurations to override the load balance decisions.
   - Dynamically adjust internal configuration thresholds based on the load 
data.
   - Make the admin APIs fault-tolerant and easy to monitor.
   
   #### Testing
   - Document the testing plan and coverage status.
   - Add unit tests for happy and unhappy cases.
   - Add global load balance logic tests, and compare the current load manager 
with the new load manager.
   
   ## API Changes
   N/A
   
   ## Implementation (High-Level Components)
   
   ### New Load Manager
   - It refactors the existing load balance logic with better modularity.
   - It isolates the new code in the new classes without breaking the existing 
logic.
   - This new load manager will be disabled in the first releases until proven 
stable.
   
   ### Load Data Models
   ##### LocalBrokerData: broker’s factual data
   - e.g.) {webServiceUrl, pulsarServiceUrl, …}
   - Persisted in MetadataStore(ZK)
   
   #### BrokerLoadData: broker’s load data 
   - e.g.) {cpu, memory, io, msgIn/Out, ...}
   - Published in BrokerLoadDataStore(TableView)
   
   #### BundlesLoadData: bundle’s load data
   - e.g.) { bundleName, msgIn/Out, ...}
   - Cached in the local broker only
   
   #### TopBundlesLoadData: top-n high-loaded bundle load data from the broker
   - e.g.) {brokerUrl, high_load_bundles :[{bundleName, …}], …}
   - Published in TopBundlesLoadDataStore(TableView)
   
   ### Load Data Write-Read Flow
   #### LocalBrokerData 
   
   Write:
   - Upon init, each broker stores LocalBrokerData in its ephemeral znode in 
MetaDataStore(ZK) to monitor live brokers(same as now)
   - The broker-level load data moved to BrokerLoadData
   
   Read:
   - All brokers check LocalBrokerData to confirm the list of live brokers.
   
   #### BrokerLoadData
   Write:
   - Each broker periodically computes local load(BrokerLoadData) and publishes 
it to BrokerLoadDataStore(TableView)(non-persistent)
       - Because non-persistent TableView can often lose data, we will add a 
TTL policy to tombstone old KVs in BrokerLoadDataStore.
   
   Read:
   - All brokers consume BrokerLoadDataStore
   - With aggregated BrokerLoadData, all brokers perform bundle assignments 
without going through the leader.
   
   #### BundlesLoadData
   Write:
   - Each broker monitors the allocated bundles' load and stores them in the 
local cache BundlesLoadData(In-memory-HashMap). 
   - BundlesLoadData will not be replicated to other brokers’ caches.
   
   Read:
   - Each broker locally reads BundlesLoadData and computes top n high load 
bundles, TopBundlesLoadData.
   - With the local BundlesLoadData, all brokers perform bundle splits without 
going through the leader.
   
   #### TopBundlesLoadData
   Write:
   - Each broker periodically compute TopBundlesLoadData and publishes it to 
TopBundlesLoadDataStore(TableView)(non-persistent)
       - We will add a TTL policy to tombstone old KVs in 
TopBundlesLoadDataStore.
   
   Read:
   - Only the leader broker consumes TopBundlesLoadDataStore 
       - With the aggregated TopBundlesLoadData and BrokerLoadData, the leader 
initiates bundle unload(transfer) operations.
   
   #### Load Data Flow
   
   
![loadDataModel_v2](https://user-images.githubusercontent.com/103456639/179900738-b492415f-713a-4860-84ef-ab2aa8577240.png)
   
   
   ### Major Modifications on Bundle Split, Unload, and Assignment Flow 
   - With the local BundlesLoadData, all brokers perform bundle splits without 
going through the leader. By default, newly split bundles will be the target to 
unload(transfer).
   - With aggregated BrokerLoadData, all brokers perform bundle assignments 
without going through the leader. 
   - With aggregated TopBundlesLoadData and BrokerLoadData, the leader makes 
decisions to unload(transfer) bundles.
   - We will add a new bundle unload option, transfer, which transfers bundles 
from one broker to another.
   - We will introduce a global channel(Bundle State Channel) to share 
consistent/linearized bundle state changes with brokers.
   
   ### Bundle State Channel
   This bundle state channel is a persistent topic table-view used as a WAL to 
broadcast the total order of all bundle state changes in the cluster. All 
brokers will asynchronously consume messages in this channel in the same order 
and react to bundle state changes(sequential consistency). With the table-view 
compaction, the bundle state channel will eventually materialize the current 
bundle-broker ownership.  Read operations on this channel can be deferred(e.g., 
clients’ topic lookup requests) in a few seconds, depending on the current 
state of the bundle.
   
   #### Bundle State Lifecycles
   We define the following states and actions and linearize the bundle state 
changes.
   (This is a high-level design to explain the concept here. The final version 
may differ.)
   
   ![Bundle States 
(5)](https://user-images.githubusercontent.com/103456639/179900786-e37f078f-537b-493e-be3e-ae60e78aaf32.jpg)
   
   
   Bundle Actions
   - Own: Own the bundle ownership
       - The owner broker is selected by the local load manager.
   - Transfer: Transfer the bundle ownership to the destination broker.
       - The source broker internally disables the bundle ownership.
       - The destination broker owns the bundle.
   - Return: Return deferred client connections with the destination broker URL
       - Close the connections if already being served
   - Split: Split the target(parent) bundle into child bundles.
   - Create: Create the child bundle entries in the channel, initially assigned 
to the local broker.
   - Discard: Discard the bundle entry in the channel(tombstone operation) 
   - Unload: Unload the bundle ownership from the owner broker
       - Disable the bundle ownership
       - Close client connections under the bundle
       - Run the Discard action
   
   
   Bundle States
   - Assigned: assigned to a broker
   - Assigning: in the process of assigning the ownership
   - Splitting: in the process of splitting a bundle range.
   - Unassigned: unassigned to any broker (removed from the channel)
   
   *New client connections to the bundle are deferred(with timeouts) in the 
Assigning state.
   
   #### Bundle State Change Examples
   The bundle state channel can be used like the followings.
   
   ##### Bundle Transfer Example
   (State, Action) Sequence:
   (Assigned, Transfer) => (Assigning, Return) => (Assigned,)
   
   1. The leader finds target bundles from TopBundlesLoadData and initiates a 
bundle unload(a transfer) by broadcasting the unload state change to the bundle 
state channel, keyed by the bundleName.
       e.g. {key:bundleName, value:{flow:transfer, action:transfer, 
state:assigning, from:A, to:B}}} 
   2. All brokers will consume the state change message in the channel.
   3. Upon consuming the message from the channel, if any state change involves 
the local broker, the broker performs its role and updates the state back in 
the channel to continue the state change. If there are conflicting state 
changes with the ongoing one, ignore them. 
   4. Meanwhile, if other brokers(broker C) receive lookup requests for the 
bundle, the client's connections will be deferred(with timeouts) until they 
receive the “Return” action. When the “Return” action is broadcasted, all 
brokers will return the pending connections with the owner broker’s URL. Also, 
the existing connections from the source broker will be closed.
   
   ##### Bundle Split Example
   (State, Action) Sequence:
   (Assigned, Split) => (Splitting, Unload | Create) => {(Unassigned, ) | 
(Assigned, ), (Assigned, )}
   
   1. Each owner broker monitors local BundlesLoadData and initiates a bundle 
split by broadcasting the transfer state change to the bundle state channel, 
keyed by the bundleName.
       e.g. {key:bundleName, value:{flow: split, action:split, state: 
splitting, from: A, to: B, transfer: true}}}
   2. Same as Bundle Transfer Example step 2.
   3. Same as Bundle Transfer Example step 3.
       a. After the “Split,” the owner broadcasts the children bundles’ 
ownership creation(state=assigned) and the parent bundle’s ownership 
unload(empty message).
       b. By default, the owner publishes a message to the TopBundlesLoadData 
store asking the leader to unload(or transfer) the children bundles.
   
   ##### Bundle Assignment Example
   (State, Action) Sequence: 
   (Unassigned, Own) => (Assigning, Return) => (Assigned,)
   
   1. When requested by clients, the first connected brokers check if any 
broker in the state channel owns the bundle. Return the owner broker URL if 
found. Else, initiate a bundle assignment by broadcasting the assignment state 
change.
       e.g. {key:bundleName, value:{flow: assignment, action:own, 
state:assigning, to: B}}}
   2. Same as Bundle Transfer Example step 2.
   3. Same as Bundle Transfer Example step 3.
   4. Same as Bundle Transfer Example step 4.
   
   #### Bundle-Broker Ownership State
   Because the bundle state channel shows the current bundle-broker ownership, 
we can remove the redundant bundle ownership store(ZK znodes). Each broker will 
look up the bundle ownership channel to check which broker currently owns the 
requested bundles or is in the ownership assignment/unload(transfer) process. 
Besides, before return, the broker availability metadata store(LocalBrokerData 
znode existence) could be checked to confirm the owner brokers' availability 
further.
   
   #### Bundle State Channel Owner Selection and Discovery
   Bundle State Channel(BSC) is another topic, and because of its circular 
dependency, we can't use the BundleStateChannel to find the owner broker of the 
BSC topic. For example, when a cluster starts, each broker needs to initiate 
BSC TopicLookUp(to find the owner broker) in order to consume the messages in 
BSC. However, initially, each broker does not know which broker owns the BSC.
   
   The ZK leader election can be a good option to break this circular 
dependency, like the followings.
   
   
   ##### Channel Owner Selection
   The cluster can use the ZK leader election to select the owner broker. If 
the owner becomes unavailable, one of the followers will become the new owner. 
We can elect the owner for each bundle state partition.
   
   ##### Channel Owner Discovery
   Then, in brokers’ TopicLookUp logic, we will add a special case to return 
the current leader(the elected BSC owner) for the BSC topics. 
   
   
   #### Conflict State Resolution(Race Conditions)
   Without distributed locks, we can resolve conflicting state changes by a 
conflict state resolution algorithm in an optimistic and eventual manner. 
Brokers can take the first valid state change in the linearized view as the 
winner state and ignore the later ones. 
   
   One caveat is that because the current table-view compaction takes only the 
last ones as the result values, we need to introduce an internal compaction 
algo for this channel to follow the conflict resolution algorithm(the first 
valid state change as the result value).
   
   ```
   Bundle State Conflict Resolution Algorithm Example
   
   For each bundle:
   
       // A[i] is a linearized bundle state change action at i, and
       // S is the current bundle state after A[i-1],
       // where the sequence number i monotonically increases.
       for each A[i] and S:
        
           // no arrows in the state diagram
           If A[i] is invalid from S: 
               Reject A[i]
   
           Else: Accept A[i]
   
   ```
   
   For instance, let’s say for bundle x, there are two conflicting assignments 
initiated. The linearized state change messages will be like the following.
   (own, to:B), (own, to:A)
   By the conflict resolution algorithm, the second state change (own, to:A) 
will be ignored by all brokers(and by the compaction algorithm). Eventually, 
the “return” message will be broadcasted by declaring that the owner is “B.”
   (own, to:B), (own, to:A), (return, to:B)
   
   Let’s take another example. Let’s say bundle x is already assigned to broker 
B, but another broker initiates the “own” action(before consuming the “return” 
action). This last “own” state change will be ignored since this action “own” 
is invalid from the previous state “assigned.” (in the above state diagram, 
there is no “own” action arrow from the “assigned” state.) 
   (own, to:B), (return, to:B), (own, to:A)
   
   #### Failure Recovery
    
   ##### When a broker is down
   When state change participants(brokers) are suddenly unavailable, the state 
change could become an orphan, as the participants do not play the role. For 
these orphan state changes, the leader broker will run orphan state clean-up 
logic. For instance, the leader can add the bundle state clean-up logic in the 
broker unavailability notification handler(znode watcher) in order to clean the 
pending bundle state changes and ownerships from unavailable brokers. Also, to 
make the clean-up logic further fault-tolerant, the leader broker will run the 
clean-up function when it initializes. Additionally, we could make the leader 
periodically call the clean-up in a separate monitor thread(we shouldn’t 
redundantly call this cleanup too often).
   
   ##### When the entire ZK is down and comes back
   Every broker will be notified when its ZK session undergoes the connection 
issue. Then, the brokers will be in the "safe" mode, serving the existing 
topics as-is, but not allowing the ZK-related operations. The leader won't run 
the bundle cleanup, transfer, nor unload logic in this case when it knows ZK is 
down.
    
   When ZK comes back, each broker will know ZK sessions are re-established. 
They will wait 2-3 mins for all brokers to complete the ZK hand-shaking. Then, 
they will recover the bundle state table-view and return to the normal mode.
   
   
   
   #### Bundle State and Load Data TableView Scalability
   
   Expected read/write traffic:
   Write: there will be relatively fewer messages from the write path with 
occasional spikes
   Read: the fan-out broadcast could cause bottlenecks when the cluster is 
enormous.
   
   This bundle state channel is relatively lightweight from the producers 
because bundle state change is relatively less frequent. Still, message 
dispatch to consumers could be heavier if the cluster is very large. The same 
issue can happen to other table-views(BrokerLoadDataStorage) introduced in this 
proposal. We could consider the following methods to scale the table views’ 
produce/consume rates in a large cluster.
   
   ##### Split Broker Cluster to multiple clusters
   Simply, one can split a massive broker cluster into multiple clusters with 
different endpoints. The bookkeeper and configuration layer can be shared among 
the broker clusters.
   
   ##### Partitioned Table-View (short-term)
   One can make the table views based on partitioned topics. Then, we can 
distribute message load to multiple partition owner brokers.
   
   ##### Sharding (long-term)
   As the conventional scalability method, one could shard the cluster to 
multiple groups of brokers. Then, we can create a separate channel for each 
shard of brokers. This means we need an additional discovery layer to map 
topics to broker shards(also need to align with Namespace Isolation Policies)
   
   We need to mention that this metadata sync scalability issue is not new in 
Pulsar, as the current Pulsar uses n-replication. For instance, all brokers' 
and all bundles' load metadata are replicated to all brokers via ZK watchers. 
Currently, distributed ZK servers send znode watch notifications to its 
clients(brokers). In this proposal, multiple table-view owner brokers(with 
partitioned table-views) can dispatch metadata change messages to the 
participants(brokers).
   
   We think this metadata sync scalability is relatively low-priority, as only 
a few customers run Pulsar clusters on such a large scale. We could ask the 
customers first to split the cluster into multiple clusters and then enable 
partitioned table views. It is not practical for a single cluster to have 
thousands of brokers. However, we still want to ensure this design is 
seamlessly extensible, as a two-way-door decision. 
   
   ## Reject Alternatives
   N/A


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to