heesung-sn commented on code in PR #20748:
URL: https://github.com/apache/pulsar/pull/20748#discussion_r1300733232


##########
pip/pip-281.md:
##########
@@ -0,0 +1,268 @@
+<!--
+RULES
+* Never place a link to an external site like Google Doc. The proposal should 
be in this issue entirely.
+* Use a spelling and grammar checker tools if available for you (there are 
plenty of free ones).
+
+PROPOSAL HEALTH CHECK
+I can read the design document and understand the problem statement and what 
you plan to change *without* resorting to a couple of hours of code reading 
just to start having a high level understanding of the change.
+
+THIS COMMENTS
+Please remove them when done.
+-->
+
+# Background knowledge
+
+- Pulsar broker load balancer periodically unloads bundles from overloaded 
brokers. During this unload process, previous owner brokers close topic 
sessions(e.g. producers, subscriptions(consumers), managed ledgers). When 
re-assigned, new owner brokers recreate the topic sessions.
+
+- Pulsar clients request `CommandLookupTopic` to lookup or assign owner 
brokers for topics and connect to them.
+
+- PIP-192, the extensible load balancer introduced the bundle state channel 
that event-sources this unloading process in a state machine manner, from 
`releasing,` `assigned`, to `owned` state order. At `releasing,` the owner 
broker "releases" the bundle ownership(close topic sessions).
+
+- PIP-192, the extensible load balancer introduced TransferShedder, a new 
shedding strategy, which pre-assigns new owner brokers beforehand.
+
+
+# Motivation
+
+- When unloading closes many topic sessions, then many clients need to request 
CommandLookupTopic at the same time, which could cause many lookup requests on 
brokers. This unloading process can be further optimized if we can let the 
client directly connect to the new owner broker without following 
`CommandLookupTopic` requests.
+- In the new load balancer(pip-192), since the owner broker is already known, 
we can modify the close command protocol to pass the new destination broker URL 
and skip the lookup requests.
+- Also, when unloading, we can gracefully shutdown ledgers -- we always close 
old managed ledgers first and then recreate it on the new owner without 
conflicts.
+
+# Goals
+- Remove clients' lookup requests in the unload protocol to reduce the publish 
latency spike and e2e latency spike during
+unloading and also to resolve bottlenecks (of thundering lookups) when there 
are a large number of topics in a cluster.
+- Gracefully shutdown managed ledgers before new owners create them to reduce 
possible race-conditions between ledger close and ledger creations during 
unloading.
+
+## In Scope
+
+<!--
+What this PIP intend to achieve once It's integrated into Pulsar.
+Why does it benefit Pulsar.
+-->
+
+- This change will be added in the extensible load balancer.
+
+## Out of Scope
+
+<!--
+Describe what you have decided to keep out of scope, perhaps left for a 
different PIP/s.
+-->
+
+- This won't change the existing load balancer behavior(modular load manager).
+
+
+
+# High Level Design
+
+<!--
+Describe the design of your solution in *high level*.
+Describe the solution end to end, from a birds-eye view.
+Don't go into implementation details in this section.
+
+I should be able to finish reading from beginning of the PIP to here 
(including) and understand the feature and 
+how you intend to solve it, end to end.
+
+DON'T
+* Avoid code snippets, unless it's essential to explain your intent.
+-->
+
+To achieve the goals above, we could modify the bundle transfer protocol by 
the following.
+The proposed protocol change is based on the bundle states from PIP-192. 
+
+Basically, we could close the ledgers only in the releasing state and finally 
disconnect clients in the owned state with destination broker urls. The clients 
will directly connect to the pre-assigned destination broker url without 
lookups.  Meanwhile, during this transfer, any produced messages will be 
ignored by the source broker.
+
+Current Unload and Lookup Sequence in Extensible Load Balancer
+```mermaid
+sequenceDiagram
+    participant Clients
+    participant Owner Broker
+    participant New Owner Broker
+    participant Leader Broker
+    Leader Broker ->> Owner Broker: "state:Releasing:" close topic
+    Owner Broker ->> Owner Broker: close broker topic sessions
+    Owner Broker ->> Clients: close producers and consumers
+    Clients ->> Clients: reconnecting (inital delay 100ms)
+    Owner Broker ->> New Owner Broker: "state:Assign:" assign new ownership
+    New Owner Broker ->> Owner Broker: "state:Owned:" ack new ownership
+    Clients ->> Owner Broker: lookup
+    Owner Broker ->> Clients: redirect
+    Clients ->> New Owner Broker: lookup
+    New Owner Broker ->> Clients: return(connected)
+```
+
+Proposed Unload Sequence in Extensible Load Balancer without Lookup
+```mermaid
+sequenceDiagram
+    participant Clients
+    participant Owner Broker
+    participant New Owner Broker
+    participant Leader Broker
+    Leader Broker ->> Owner Broker: "state:Releasing:" close topic
+    Owner Broker ->> Owner Broker: close broker topic sessions(e.g ledgers) 
without disconnecting producers/consumers(fenced)
+    Clients -->> Owner Broker: message pubs are ignored
+    Owner Broker ->> New Owner Broker: "state:Assign:" assign new ownership
+    New Owner Broker ->> Owner Broker: "state:Owned:" ack new ownership
+    Owner Broker ->> Owner Broker: close the fenced broker topic sessions
+    Owner Broker ->> Clients: close producers and consumers (with 
newOwnerBrokerUrl)
+    Clients ->> New Owner Broker: immediately connect

Review Comment:
   > > It is hard to say the exact time as this time could be more or less 
depending on the cluster size. I observed that p99.99 pub latency was around < 
2 secs in a cluster of 10 brokers with 10k topic partitions and publishers 
distributed to 1000 bundles at 1000 msgs/s .
   > 
   > This is a really valuable data point! Is the 2 seconds the time for 
transfer or just the time from "owner broker sends assign" to "owner broker 
receives owned"? Also, do you have any data comparing the current load 
balancing interruption to this new topic transfer protocol's interruption? It'd 
really help to see more data.
   > 
   
   It is the p99.99 pub latency reported on the producer client. So, it should 
include all server-side state changes and final persist ack time.
   
   > > Could you explain your concerns about the "ignore messages" state?
   > 
   > Ignoring messages is inefficient, and in my view, our goal is to build a 
highly optimized message broker. The owner broker is already overloaded--it is 
offloading a bundle--so doing something to minimize its network and memory 
utilization seems valuable. Further, I want to minimize the tradeoffs 
associated with load balancing because too many tradeoffs will lead to less 
aggressive load balancing. If we can avoid ignoring messages, I think we should.
   > 
   
   > It seems to me that the current load balancing hand off protocol is the 
naive solution, and we're looking to replace it with a more sophisticated 
solution. The solution we choose will be in place for a while.
   > 
   > > The "ignore messages" state is a somewhat optimistic optimization 
without additional commands and client-side complexity, considering 1. the 
transfer protocol finishes fairly quickly and 2. the ignored messages could be 
somewhat limited by the unloading time(also deterministically controlled by 
`maxPendingMessages`). During unloading, the source broker anyway cannot ack 
messages(ignore messages) after the ledger has been closed. When the ownership 
state transfer is complete, the producer will immediately resend the pending 
messages to the dst broker within x secs(< 2 secs in the above example). Also, 
the number of pending(ignored) messages can be limited by `maxPendingMessages.`
   > 
   > I agree that it will be bound by `maxPendingMessages`, but I think it is 
unlikely that most producers will hit their limit in single digit seconds, 
which indicates that the amount of unnecessary data transfer for the bundle 
will probably be equal to the `bytes/sec` * `duration of transfer`. I also 
agree that we're not really talking about ways to decrease the amount of time 
that messages are unacked (however, I think that eager connection to the 
destination broker could lead to decreased latency).
   > 
   > > I am worried about the complexity of this change, and the gain might be 
less significant, as the unloading is mostly momentary. Also, by this PIP we 
want to introduce minimal complexity in this round with less error-prone code.
   > 
   > To be clear, this is a complex change. We are replacing a naive solution 
that involved an abrupt disconnect followed by client polling to reconnect with 
a reactive solution that tells the client where to connect. The previous 
"naive" solution was able to avoid sending ignored messages because it was 
eagerly disconnected. It is fair to want to avoid overly complex solutions, but 
I don't think it would be surprising to add a new protocol message to solve 
topic transfer.
   > 
   > In reviewing my earlier diagram, I think we could get away with 1 new 
field on the close commands and 1 new protocol message. The first new field is 
to tell clients to disconnect without trying to reconnect immediately. The 
second is to tell them the new broker owner. This command optimistically 
assumes the assigned broker will transition to the owner broker. (Is that a 
safe assumption?) The broker code could be updated to make sure that when a 
topic (bundle) is being loaded, it doesn't reject producer and consumer create 
requests, but holds them on the line to prevent unnecessary retries.
   > 
   > ```mermaid
   > sequenceDiagram
   >     participant Clients
   >     participant Owner Broker
   >     participant New Owner Broker
   >     participant Leader Broker
   >     Leader Broker ->> Owner Broker: "state:Releasing:" close topic
   >     Owner Broker ->> Owner Broker: close broker topic sessions
   >     Owner Broker ->> Clients: close producers and consumers (and indicate 
new broker command will come)
   >     Owner Broker ->> New Owner Broker: "state:Assign:" assign new ownership
   >     Owner Broker ->> Clients: tell clients to connect to new owner broker
   >     Clients ->> New Owner Broker: immediately connect and attempt to 
create producers/consumers (broker does not reject)
   >     New Owner Broker ->> Owner Broker: "state:Owned:" ack new ownership
   >     New Owner Broker ->> Clients: PRODUCER_SUCCESS and SUBSCRIBE_SUCCESS 
commands
   > ```
   > What do you think? Thanks!
   
   If we want the prepare commands(e.g. PrepareCloseProducerCommand), I think 
we can add the dstBrokerUrls too and make the clietns immediately connect to 
the brokers upon `prepare`. Still any inflight messages need to be ignored at 
source brokers.
   
   
   ```mermaid
   sequenceDiagram
       participant Clients
       participant Owner Broker
       participant New Owner Broker
       participant Leader Broker
       Leader Broker ->> Owner Broker: "state:Releasing:" close topic
       Owner Broker ->> Clients: pause producers and brokers
       Clients->> New Owner Broker: open a new connection to the dest broker
       Owner Broker ->> Owner Broker: close broker topic sessions(e.g ledgers) 
without disconnecting producers/consumers(fenced)
       Clients -->> Owner Broker: message pubs are ignored
       Owner Broker ->> New Owner Broker: "state:Assign:" assign new ownership
       New Owner Broker ->> Owner Broker: "state:Owned:" ack new ownership
       Owner Broker ->> Owner Broker: close the fenced broker topic sessions
       Owner Broker ->> Clients: close producers and consumers (with 
newOwnerBrokerUrl)
       Clients ->> New Owner Broker: connect and send create producer|consumer
   
   ```
   
   
   I agree that the additional prepare command has the following advantages.
   - it can forcefully pause producers sending messages, which could save 
network bandwidth during unloading.
   - it can make clients pre-connect to the destination broker, which could 
help reduce the latency.
   
   However, considering followings,
   
   - Even if we send the "prepare" commands to the clients, any inflight 
messages need to be ignored anyway.
   - The network bandwidth saving could be negligible: the ignored messages are 
limited by the maxPendingMessages(default 0 since 2.10, but previously 1000) 
and mem-based pending messages(64mb since 2.10). These ignored messages will 
also be limited by how fast the unloading finishes (Optimistically speaking, < 
2 secs in the above example).
   - It needs brokers to additionally send the prepare command to the clients, 
adding client-broker protocol complexity. 
   - It adds the complexity of adding `paused` states in the clients. We have 
java, c++, golang code to maintain, so this will increase the scope of the work 
and complexity.
   - The pre-connection effect can be useful only for the first send message's 
latency because the next send messages will reuse the connection anyway.
   
   I believe this "prepare" command can be added in the next iteration(probably 
optimization phase 2), too, if we see the strong need here.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to