lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1843861334
########## pip/pip-385.md: ########## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + +<details> + <summary><h2>Table of Contents</h2></summary> + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) + + [Broker Changes](#broker-changes) + + [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) + + [Java Client Changes](#java-client-changes-1) + + [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) + + [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) + + [Binary Protocol](#binary-protocol) + + [Java Client](#java-client) + + [Configuration](#configuration) + + [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + +</details> + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram + Client->>Server: CreateProducer(reqId, myTopic) + Note right of Server: Check Authorization + Server-->>Client: ProducerSuccess(reqId, producerName) + Activate Client + Activate Server + Client->>Server: Send(1, message1) + Client->>Server: Send(2, message2) + Server-->>Client: SendReceipt(1, msgId1) + Client->>Server: Send(3, message3) + Client->>Server: Send(4, message4) + Note right of Server: Topic breaching quota + Activate Server + note right of Server: TCP channel read paused + Client-xServer: Send(5, message5) + Server-->>Client: SendReceipt(2,msgId2) + Server-->>Client: SendReceipt(3,msgId3) + Client-xServer: Send(6, message6) + Server-->>Client: SendReceipt(4,msgId4) + note right of Server: TCP channel read resumed + deactivate Server + Server-->>Server: read message 5 + Server-->>Server: read message 6 + Client->>Server: Send(7, message7) + Server-->>Client: SendReceipt(5,msgId5) + Server-->>Client: SendReceipt(6,msgId6) + Server-->>Client: SendReceipt(7,msgId7) + + Client->>Server: CloseProducer(producerId, reqId) + Server-->>-Client: Success(reqId) + deactivate Client +``` + +## Challenges with the current approach + +The current approach may look perfectly fine when looking at the above example, but when looked from a wider scope, +things start looking bad. +Typically, the clients reuse a single TCP connection from the client to a broker to send messages to multiple topics. +This is controlled by the client side property +of [connectionsPerBroker](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java#L135) +which defaults to 1. The situation is worsened by the fact that typically, a client is used to create producers for +partitioned topics and generally an application may produce to more than one partitioned topic with the producers +created from the same client object, thus all sharing the same tcp connection. + +In this situation, even when a single topic starts breaching the quota, the entire TCP connection is paused leading to +a noisy neighbour effect where effectively all the topics that the client is producing to start getting throttled and +observe high latencies. + +# Motivation + +The current method of inducing throttling when a topic or connection breaches quota has various challenges: + +* **Noisy neighbors** - Even if one topic is exceeding the quota, since the entire channel read is paused, all topics + sharing the same connect (for example - using the same java client object) get rate limited. +* **Unaware clients** - clients are completely unaware that they are being rate limited. This leads to all send calls + taking super long time or simply timing out (assuming shorter send timeouts). If clients were aware, they can either + fail fast or induce back-pressure to their upstream. +* **Impossible debugging** - Since all topics emit the rate limit metric, it is practically impossible to figure out + which + actual topic is breaching the quota in order to update the topic policies. +* **Missing protocol** - Since rate limiting is a first class citizen of messaging sub-system, it really should be + present as a response in the protocol as well. + +# Goals + +## In Scope + +* Introduce a new binary protocol command pair to notify clients about throttling and get an acknowledgement back from + the clients that they respect the throttling and will stop producing further until mentioned. + * If acknowledgement is received within a configured time, we do not pause the connection for further reads. +* [Java client] Add client public API interface to indicate if a producer is being throttled. +* [Java client] Add relevant new PulsarClientException and logic to throw throttling related exception instead of + timeout if needed. +* [Java client] Add OTel metrics about rate limiting. + +## Out of Scope + +* Changing the core rate limiting logic. +* Implementation for other language clients +* Changes in other protocols + +# High Level Design + +## New binary protocol commands + +We introduce a new comment which server will send to clients - `ThrottleProducer(reqId, throttleData)` and server will +expect an acknowledgement command back within a configured time window `ThrottleProducerReceipt(reqId)`. + +The broker already records different levels of throttling in one way or another via metrics or counters, both at a topic +level and at a connection level as well. The main design idea is that wherever today we take action to pause the +channel, we first instead send the `ThrottleProducer` command and if we receive the `ThrottleProducerReceipt` response, +instead of pausing the channel, we rely on clients not sending further messages for the breaching topic. If the response +doesn't come within the configured window, we continue to pause the channel as usual. + +For the **case where connection level breaches** happen - i.e. breach due to maxPendingPublishRequestsPerConnection, +maxMessagePublishBufferSizeInMB or broker level rate limit - **we continue to pause the connection**, but we still send +the `ThrottleProducer` command in order to inform the client about the reason for any potential timeout. The reason we +continue to pause reads is that we are already breaching memory limits, thus, even if the client sends +a `ThrottleProducerReceipt` response, we won't be able to read it until further pending messages before that are read. + +Here is a sequence diagram highlighting the case when a topic level breach happens: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram + Client->>Server: CreateProducer(reqId, myTopic) + Note right of Server: Check Authorization + Server-->>Client: ProducerSuccess(reqId, producerName) + Activate Client + Activate Server + Client->>Server: Send(1, message1) + Client->>Server: Send(2, message2) + Server-->>Client: SendReceipt(1, msgId1) + Note right of Server: Check Throttling + + opt topic/connection throttled + Server->>Client: ThrottleProducer(reqId, throttleData) + alt client sends receipt in time + Client-->>Server: ThrottleProducerReceipt(reqId) + note right of Client: client pauses till specified time + Note over Client, Server: After some time + Client->>Server: Send(3, message3) + Client->>Server: Send(4, message4) + + else no response + Activate Server + note right of Server: TCP read paused + Client-xServer: Send(3, message3) + Note over Client, Server: After some time + opt topic/connection unthrottled + note right of Server: TCP read resumed + deactivate Server + Server-->>Server: read message 3 + Client->>Server: Send(4, message4) + end + end + end + Server-->>Client: SendReceipt(2,msgId2) + Server-->>Client: SendReceipt(3,msgId3) + Server-->>Client: SendReceipt(4,msgId4) + + Client->>Server: CloseProducer(producerId, reqId) + Server-->>-Client: Success(reqId) + deactivate Client +``` + +## Java client changes + +* Client will now have logic to understand the `ThrottleProducer` command and take relevant action of blocking further + messages for the relevant topic. It will then respond back with `ThrottleProducerReceipt` command. + * Client will resume message sending after the specified time in the `ThrottleProducer` command's data. + * This interval of no messages will be noted as "being throttled" + * Within this duration, another `ThrottleProducer` command from server may come. +* Producer will record new OTel metric indicating which topic was throttled and the reason. +* In case a message fails due to timeout and there was a throttled command from server for the owning topic, client will + instead throw a rate limit exception instead of timeout exception. + +# Detailed Design + +## High-level Implementation Details + +### Broker Changes + +* For calls arising from `PublishRateLimiterImpl` class, add logic in `ServerCnxThrottleTracker.java` to send the + command to client and wait for response for the max configured duration before calling `changeAutoRead`. It checks for + feature availability first. +* For calls arising from `ServerCnxThrottleTracker::changeThrottlingFlag`, we send the command async (if feature + supported) without worrying about response and then call `changeAutoRead`. +* capture `AbstractTopic::getTotalPublishRateLimitCounter` per publish rate limit counter and add relevant attribute in + the rate limit metric + Review Comment: > Can you elaborate more about this concern? Which specific part needs to hand "fair queuing"? Maybe an example could help me understand. But I am answering below based on our past discussions on slack and my understanding of what you mean here.. Well if there are multiple producer producing to the same topic, the throttling solution will have to throttle all producers somewhat evenly so that all producers get a fair share. It doesn't have to be perfect. Ignoring this aspect would cause the solution to not meet the goals. The goal is to avoid TCP level throttling. That means that with multiple producers, there's a need to send out throttle commands to all producers in a way that they produce about the same amount, thus it's called "fair queuing". If there would be priorities, that information could be later used to give some producers more "shares". Obviously prioritization could be scoped out from this phase. However, it's something that might be useful later and is typically a concern in multi-tenant systems with different QoS levels. > I am trying to stray away from "fair share" issue all together as it is very difficult to solve in general. From a topic and broker perspective, the only fair thing is to not receive any further messages from any producer. Now, if we try to figure out throttle time differently for different producers based on some historic data (rate/etc), that may seem fair at first. Or even if we actually distribute the rate among all producers, which also may seem fair at first. But there are problems in both the situations: It's not an option to stay away from the problem unless we admit that this solution doesn't support multiple producers. > * The producer count isn't fixed. It may vary within a single throttling period itself. The changing number of producers isn't a problem. The throttling solution on the broker side will have to consider numerous changes and the changing number of producers doesn't make that harder. > * The producers themselves aren't sticky. Partitioned topics is a very big real world use case and the way it works is that at any given point in time, a partitioned producer is only writing to a single partition. This "time" is defined by the batch wait time and batch max size. This value in practice can be double digit ms. So suppose a topic is throttled for 300ms, effectively, in that 300 ms, only 3 of 10 connected producers would have actually produced.. At a micro level, the distribution is not uniform at all. This will impact accuracy, but it's not a reason to ignore fairness. Let's take a look at the current solution for fairness in PublishRateLimiterImpl. The solution is very simple. When publishing is throttled, the producer will be added to a unthrottling queue. When the throttling period is over, producer will be unthrottled one by one, as long as it doesn't go over the limit again. This doesn't ensure perfect fairness in any way, but the solution for good enough for a starting point. For the throttling command based throttling a similar solution wouldn't really work. It would have to be designed in a different way. However it doesn't have to be perfect either. > * Given this bursty nature of producers at a sub second granularity, doing either uniform distribution (of permits/throttle time) or historic rate based - both would lead to unfair situation. Thus, i feel first cum first serve might actually work out the fairest. This is a challenge in accuracy. We simply need a good enough solution which ensures some level of fairness while the goal of PIP-385 is achieved. We want to avoid having to throttle at the TCP level. As we know, that cannot be removed since clients might misbehave and that could lead to the broker running out of memory or much higher memory unless the challenge is solved. Unless this was a hard problem to solve we would have added producer throttling much easier. I'm not making up design problems here when I'm bringing up these points. These are essential complexity of the problem domain that must be solved in a good enough way to meet the requirements. I'm sure that we'll get there when we learn more about the problem, experiment with different solutions and continue iterating until we find those good enough solutions. It won't happen just by writing text in a design document. We will need to do many experiments along the way. That's how we will make progress. One of the starting points could be to describe a good acceptance test plan for PIP-385. That would clarify what we are really expecting from this feature and how we will find out when we have a good enough solution in place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
