grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840983604


##########
pip/pip-385.md:
##########
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+<details>
+  <summary><h2>Table of Contents</h2></summary>
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
+    + [Broker Changes](#broker-changes)
+    + [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
+    + [Java Client Changes](#java-client-changes-1)
+    + [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
+    + [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
+    + [Binary Protocol](#binary-protocol)
+    + [Java Client](#java-client)
+    + [Configuration](#configuration)
+    + [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+</details>
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+    Client->>Server: CreateProducer(reqId, myTopic)
+    Note right of Server: Check Authorization
+    Server-->>Client: ProducerSuccess(reqId, producerName)
+    Activate Client
+    Activate Server
+    Client->>Server: Send(1, message1)
+    Client->>Server: Send(2, message2)
+    Server-->>Client: SendReceipt(1, msgId1)
+    Client->>Server: Send(3, message3)
+    Client->>Server: Send(4, message4)
+    Note right of Server: Topic breaching quota
+    Activate Server
+    note right of Server: TCP channel read paused
+    Client-xServer: Send(5, message5)
+    Server-->>Client: SendReceipt(2,msgId2)
+    Server-->>Client: SendReceipt(3,msgId3)
+    Client-xServer: Send(6, message6)
+    Server-->>Client: SendReceipt(4,msgId4)
+    note right of Server: TCP channel read resumed
+    deactivate Server
+    Server-->>Server: read message 5
+    Server-->>Server: read message 6
+    Client->>Server: Send(7, message7)
+    Server-->>Client: SendReceipt(5,msgId5)
+    Server-->>Client: SendReceipt(6,msgId6)
+    Server-->>Client: SendReceipt(7,msgId7)
+
+    Client->>Server: CloseProducer(producerId, reqId)
+    Server-->>-Client: Success(reqId)
+    deactivate Client
+```
+
+## Challenges with the current approach
+
+The current approach may look perfectly fine when looking at the above 
example, but when looked from a wider scope,
+things start looking bad.
+Typically, the clients reuse a single TCP connection from the client to a 
broker to send messages to multiple topics.
+This is controlled by the client side property
+of 
[connectionsPerBroker](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java#L135)
+which defaults to 1. The situation is worsened by the fact that typically, a 
client is used to create producers for
+partitioned topics and generally an application may produce to more than one 
partitioned topic with the producers
+created from the same client object, thus all sharing the same tcp connection.
+
+In this situation, even when a single topic starts breaching the quota, the 
entire TCP connection is paused leading to
+a noisy neighbour effect where effectively all the topics that the client is 
producing to start getting throttled and
+observe high latencies.
+
+# Motivation
+
+The current method of inducing throttling when a topic or connection breaches 
quota has various challenges:
+
+* **Noisy neighbors** - Even if one topic is exceeding the quota, since the 
entire channel read is paused, all topics
+  sharing the same connect (for example - using the same java client object) 
get rate limited.
+* **Unaware clients** - clients are completely unaware that they are being 
rate limited. This leads to all send calls
+  taking super long time or simply timing out (assuming shorter send 
timeouts). If clients were aware, they can either
+  fail fast or induce back-pressure to their upstream.
+* **Impossible debugging** - Since all topics emit the rate limit metric, it 
is practically impossible to figure out
+  which
+  actual topic is breaching the quota in order to update the topic policies.
+* **Missing protocol** - Since rate limiting is a first class citizen of 
messaging sub-system, it really should be
+  present as a response in the protocol as well.
+
+# Goals
+
+## In Scope
+
+* Introduce a new binary protocol command pair to notify clients about 
throttling and get an acknowledgement back from
+  the clients that they respect the throttling and will stop producing further 
until mentioned.
+    * If acknowledgement is received within a configured time, we do not pause 
the connection for further reads.
+* [Java client] Add client public API interface to indicate if a producer is 
being throttled.
+* [Java client] Add relevant new PulsarClientException and logic to throw 
throttling related exception instead of
+  timeout if needed.
+* [Java client] Add OTel metrics about rate limiting.
+
+## Out of Scope
+
+* Changing the core rate limiting logic.
+* Implementation for other language clients
+* Changes in other protocols
+
+# High Level Design
+
+## New binary protocol commands
+
+We introduce a new comment which server will send to clients - 
`ThrottleProducer(reqId, throttleData)` and server will
+expect an acknowledgement command back within a configured time window 
`ThrottleProducerReceipt(reqId)`.
+
+The broker already records different levels of throttling in one way or 
another via metrics or counters, both at a topic
+level and at a connection level as well. The main design idea is that wherever 
today we take action to pause the
+channel, we first instead send the `ThrottleProducer` command and if we 
receive the `ThrottleProducerReceipt` response,
+instead of pausing the channel, we rely on clients not sending further 
messages for the breaching topic. If the response
+doesn't come within the configured window, we continue to pause the channel as 
usual.
+
+For the **case where connection level breaches** happen - i.e. breach due to 
maxPendingPublishRequestsPerConnection,
+maxMessagePublishBufferSizeInMB or broker level rate limit - **we continue to 
pause the connection**, but we still send
+the `ThrottleProducer` command in order to inform the client about the reason 
for any potential timeout. The reason we
+continue to pause reads is that we are already breaching memory limits, thus, 
even if the client sends
+a `ThrottleProducerReceipt` response, we won't be able to read it until 
further pending messages before that are read.
+
+Here is a sequence diagram highlighting the case when a topic level breach 
happens:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+    Client->>Server: CreateProducer(reqId, myTopic)
+    Note right of Server: Check Authorization
+    Server-->>Client: ProducerSuccess(reqId, producerName)
+    Activate Client
+    Activate Server
+    Client->>Server: Send(1, message1)
+    Client->>Server: Send(2, message2)
+    Server-->>Client: SendReceipt(1, msgId1)
+    Note right of Server: Check Throttling
+
+    opt topic/connection throttled
+        Server->>Client: ThrottleProducer(reqId, throttleData)
+        alt client sends receipt in time
+            Client-->>Server: ThrottleProducerReceipt(reqId)
+            note right of Client: client pauses till specified time
+            Note over Client, Server: After some time
+            Client->>Server: Send(3, message3)
+            Client->>Server: Send(4, message4)
+
+        else no response
+            Activate Server
+            note right of Server: TCP read paused
+            Client-xServer: Send(3, message3)
+            Note over Client, Server: After some time
+            opt topic/connection unthrottled
+                note right of Server: TCP read resumed
+                deactivate Server
+                Server-->>Server: read message 3
+                Client->>Server: Send(4, message4)
+            end
+        end
+    end
+    Server-->>Client: SendReceipt(2,msgId2)
+    Server-->>Client: SendReceipt(3,msgId3)
+    Server-->>Client: SendReceipt(4,msgId4)
+
+    Client->>Server: CloseProducer(producerId, reqId)
+    Server-->>-Client: Success(reqId)
+    deactivate Client
+```
+
+## Java client changes
+
+* Client will now have logic to understand the `ThrottleProducer` command and 
take relevant action of blocking further
+  messages for the relevant topic. It will then respond back with 
`ThrottleProducerReceipt` command.
+    * Client will resume message sending after the specified time in the 
`ThrottleProducer` command's data.
+    * This interval of no messages will be noted as "being throttled"
+    * Within this duration, another `ThrottleProducer` command from server may 
come.
+* Producer will record new OTel metric indicating which topic was throttled 
and the reason.
+* In case a message fails due to timeout and there was a throttled command 
from server for the owning topic, client will
+  instead throw a rate limit exception instead of timeout exception.
+
+# Detailed Design
+
+## High-level Implementation Details
+
+### Broker Changes
+
+* For calls arising from `PublishRateLimiterImpl` class, add logic in 
`ServerCnxThrottleTracker.java` to send the
+  command to client and wait for response for the max configured duration 
before calling `changeAutoRead`. It checks for
+  feature availability first.

Review Comment:
   > This logic might need some more careful thought.
   > 
   > It feels that in the case that the client supports the throttling command, 
that the broker should be able to accept that the rate goes over the limit in a 
shorter period of time and if this continues for too long, the throttling by 
changing auto read would have to happen since the broker could run out of 
memory if this doesn't happen.
   > 
   > One way to handle this is to send the throttling command to the client as 
soon as the tokens are below a certain threshold. If the tokens get consumed to 
a certain value (it can be negative), the broker would have to change auto read 
to false.
   > 
   > The flow control will be less accurate when using the throttling command 
and period based solution. The reason for this is that there's a queue between 
the client and the broker. When the client stops sending, the broker will 
continue to receive all produced messages that are in the queue. This is the 
reason why I was more in favor of a permit (credit) based flow control 
initially when we started the discussion about this PIP. However, I think that 
the problem can be solved with the throttling command based solution, but it 
will most likely require additional details that aren't currently in this PIP. 
We can figure out the details along the way when we get further with the 
implementation.
   
   My major concern with permit based (and even if we put in a threshold to 
send in the command, it kind of falls in same category) is that its a major 
challenge to implement it correctly and efficiently in a distributed manner. 
   The producers here are distributed. We cannot make assumptions on the 
message rate spread of those producers, specially given how the batching works 
(round robin) from a client's perspective - leading to sudden bursts to a 
subset of producers for a topic at any given time. Managing permits in this 
situation at a sub second level is almost impossible to achieve without 
massively overshooting or undershooting the rate limiting mark.
   
   I do understand that the producers could snoop in many more messages before 
they the throttle acknowledgement is read, but doing the other way round also 
is not feasible i.e. blocking the channel read until all throttle receipts are 
received ... as the channel is paused...
   
   > If the broker would know how many messages there are in the queues between 
the client and the broker, it would be possible for the broker to take this 
into account when calculating the throttling period and when it should send out 
the command to throttle the producer. However, this would most likely require a 
separate control channel between the broker and the client to pass this 
information.
   > 
   > It is also possible that this problem makes the throttling command useless 
for producer flow control without the need to constantly use the TCP level 
throttling. This would make this PIP not meet the original goals.
   > 
   > There are many ways around the challenges, but solving this aspect is 
perhaps the hardest problem in implementing this PIP so that it meets the 
original goals.
   > 
   > One possible solution could be to estimate the queue size by using the 
Pulsar's Ping/Pong commands. For this purpose, the broker would have to send 
Ping commands frequently and measure the round trip time from the Pong 
responses. However, [the current CommandPing and CommandPong messages don't 
currently contain any 
fields](https://github.com/apache/pulsar/blob/9a97c843a46e23a0811e2172991cd00a3af642c0/pulsar-common/src/main/proto/PulsarApi.proto#L701-L704),
 so passing a request id for correlating the Pong response with the Ping 
request isn't currently possible in the protocol. It would be necessary to add 
the `request_id` field as there is for other commands to support using 
Ping/Pong for calculating RTT.
   > 
   > The way how RTT is useful in preventing the need to block the reads (by 
changing auto read to false) is that the broker could assume that the client 
continues to produce messages with a constant rate and so that it sends the 
throttle command early enough that the threshold to block reads isn't met. I 
don't currently see other simple solutions to solve this problem. Perhaps there 
are also others. I think I had this RTT solution in mind when I first accepted 
that we can move forward with the throttle command based flow control instead 
of using permits based flow control. The permits based flow control has other 
problems so it might eventually be easier to solve flow control with the 
throttle command based flow control. In the permits based flow control, one of 
the challenges is that there could be multiple producers and if permits have 
been sent out to one producer, it could potentially prevent other producers 
from producing. It's also hard to integrate the permits based flow cont
 rol to multiple ways of throttling at multiple levels. That's why I think that 
throttle command based flow control is a better way forward, as long as we are 
able to solve the challenge that I hopefully described well enough in this 
comment.
   
   RTT time is one aspect that can help predicting and anticipating the 
throttling, but there are other aspects as well. TCP is dual channel, but in 
each way (sending the throttle command, and receiving back the ack) there could 
be pending messages. Although, the processing of those messages is async, so 
clearing any pending message read (on broker) or send receipt read (on client 
side) and reaching the actual throttle/throttle receipt command should be quick 
enough. This adds to the the network based RTT.
   
   Moreover, calculating RTT using ping/pong would actually also face similar 
challenge based on the noisiness and variability of the producer rate from the 
producer in any point in time.. (batches and round robin partition assignment 
leading to burst patterns at a sub-second level)
   
   This part of the proposal definitely needs more work to improve things, but 
I think its a good starting point with the pros overtaking the cons.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to