Hi Hong, > ## Q1: Do we need to pass in batch of messages / RequestInfo to `shouldBlock()`
If you think having strict limits is important, we could reverse the `startedRequest` and `shouldBlock` calls as I suggested in my last email: 1. AsyncSinkWriter constructs request with batch of messages - This can be triggered by one of 3 conditions: Timer trigger, batch byte size threshold, batch number size threshold. 2. AsyncSinkWriter calls RateLimitingStrategy.startedRequest(RequestInfo) 3. AsyncSinkWriter calls RateLimitingStrategy.shouldBlock() 4. When request completes, AsyncSinkWriter calls RateLimitingStrategy.completedRequest(RequestInfo) > ## Q2: Do we need to expose nextBatchSize or something to affect the construction of the next batch? > If we go with never exceeding the maxInFlightMessages, we can enter a situation where there are no in-flight requests, and the next request's batch size is larger than the maxInFlightMessages. Isn't `maxInFlightMessages` the same as the currently misnamed `maxBufferedRequests`? Or do you maybe mean to limit the size of the next batch, the equivalent of the currently existing `AIMDRateLimitingStrategy#getRateLimit`? But that's a different limit and +1 for keeping it. Best, Piotrek sob., 25 cze 2022 o 21:56 Teoh, Hong <lian...@amazon.co.uk> napisał(a): > Hi Piotr, > > Thanks for the further questions. > > To answer both your questions, let's consider the different flows for > interaction between RateLimitingStrategy and AsyncSinkWriter (IMO it is > important since it affects the interface!): > > One potential interaction flow is: > 1. AsyncSinkWriter constructs request with batch of messages > - This can be triggered by one of 3 conditions: Timer trigger, batch > byte size threshold, batch number size threshold. > 2. AsyncSinkWriter calls RateLimitingStrategy.shouldBlock() > 3. If request starts, AsyncSinkWriter calls > RateLimitingStrategy.startedRequest(RequestInfo) > 4. When request completes, AsyncSinkWriter calls > RateLimitingStrategy.completedRequest(RequestInfo) > > ## Q1: Do we need to pass in batch of messages / RequestInfo to > `shouldBlock()` > My thoughts are that `shouldBlock()` needs to know whether the > (currentInFlight + newRequest > maxInFlight), and reject if so. An > alternative to avoid passing this info in is to make RateLimitingStrategy > only reject the request AFTER the (currentInFlight > maxInFlight). > However, I don't like the idea that a sink has the ability to exceed its > `maxInFlightMessages`, since it will not be doing what it says on the box. > > ## Q2: Do we need to expose nextBatchSize or something to affect the > construction of the next batch? > If we go with never exceeding the maxInFlightMessages, we can enter a > situation where there are no in-flight requests, and the next request's > batch size is larger than the maxInFlightMessages. > 1) We will not update RateLimitingStrategy's maxInFlightMessages, since > this will only be called once a request is started / completed > 2) We cannot guarantee the next request will have a smaller batch size, > since the AsyncSinkWriter makes this construction independently of the > maxInFlightMessages current set in RateLimitingStrategy. > > We can close this gap by exposing `currentMaxBatchSize()` or > `nextBatchSize()` in RateLimitingStrategy that will tell AsyncSinkWriter > the maximum size of the next batch. > > > btw, isn't this variable misnamed? (maxBufferedRequests) > > Yes you are right. It would be more appropriately named > maxBufferedMessages or maxBufferedRequestEntries. However, these names are > internal, so we can rename it appropriately. > > Regards, > Hong > > > > On 24/06/2022, 17:07, "Piotr Nowojski" <pnowoj...@apache.org> wrote: > > CAUTION: This email originated from outside of the organization. Do > not click links or open attachments unless you can confirm the sender and > know the content is safe. > > > > Hi Hong, > > Thanks for your clarifications. > > > We can change it such that AsyncSinkWriter passes a constructed > batch of > messages to the RateLimitingStrategy.shouldBlock(), and that returns a > boolean, so the RateLimitingStrategy can decide the evaluation logic > internally. > > Do we need to pass the constructed batch of messages to > `shouldBlock()`? > Can we not call `startRequest()` first, then ask whether we should > block? > We could check `shouldBlock()` either after or before actually > starting the > request that we marked in `startRequest()` - it seems to me this could > be > an implementation detail. In either way, the system would behave more > or > less the same. Does it matter if we block one request later or sooner? > > > but we also have to also expose "currentInFlightMessageCapacity" > from the > RateLimitingStrategy as well > > Do we have to? Currently this is checked against > `AsyncSinkWriter#maxBufferedRequests`, can we not keep it like that? > And > btw, isn't this variable misnamed? It suggests checking the max number > of > requests (one async request = one batch?), but from the code it looks > like > it's actually `maxBufferedMessages`? > > Best, > Piotrek > > > pt., 24 cze 2022 o 09:34 Teoh, Hong <lian...@amazon.co.uk> napisał(a): > > > Hi Piotr, > > > > Thanks for your feedback! > > > > > As I understand it, this effort is about replacing hardcoded > > `AIMDRateLimitingStrategy` with something more flexible? > > > > Yes __ > > > > > > > I have one main question about the design. Why are you > > trying to split it into three different interfaces? > > Can not we have a single public interface `RateLimitingStrategy` > > > > You're right about the intention being to separate out the (what), > (when) > > and (how). > > > > The intention here was to make the separation of concerns clearer, > but I > > like your idea to reduce the surface area in the interface. > > We can change it such that AsyncSinkWriter passes a constructed > batch of > > messages to the RateLimitingStrategy.shouldBlock(), and that returns > a > > boolean, so the RateLimitingStrategy can decide the evaluation logic > > internally. (That was my original idea too __ ) > > > > 1. RateLimitingStrategy can update it's internal logic on > `completeRequest > > ` `startRequest` methods > > 2. RateLimitingStrategy can provide a go/no-go decision on > `shouldBlock`, > > given a List of requests. > > > > The above works, but we also have to also expose > > "currentInFlightMessageCapacity" from the RateLimitingStrategy as > well > > (since it is important the batch of messages in the proposed request > be > > constructed < currentInFlightMessageCapacity), otherwise we will end > up in > > a situation where requests will never be sent. > > > > An alternative is to give RateLimitingStrategy the power to > construct the > > size of the batches, I think that would be bloating the > responsibility of > > the RateLimitingStrategy a little too much. What do you think? > > > > > > Regards, > > Hong > > > > On 23/06/2022, 10:05, "Piotr Nowojski" <pnowoj...@apache.org> wrote: > > > > CAUTION: This email originated from outside of the organization. > Do > > not click links or open attachments unless you can confirm the > sender and > > know the content is safe. > > > > > > > > Hi Hong, > > > > As I understand it, this effort is about replacing hardcoded > > `AIMDRateLimitingStrategy` with something more flexible? +1 for > the > > general > > idea. > > > > If I read it correctly, there are basically three issues: > > 1. (what) `AIMDRateLimitingStrategy` is only able to limit the > size of > > all > > in-flight records across all batches, not the amount of in-flight > > batches. > > 2. (when) Currently `AsyncSinkWriter` decides whether and when to > > scale up > > or down. You would like it to be customisable behaviour. > > 3. (how) The actual `scaleUp()` and `scaleDown()` behaviours are > > hardcoded, > > and this could be customised as well. > > > > Right? Assuming so, I have one main question about the design. > Why are > > you > > trying to split it into three different interfaces? > > Can not we have a single public interface `RateLimitingStrategy` > > instead of > > three that you proposed, that would have methods like: > > > > `bool shouldRateLimit()` / `bool shouldBlock()` > > `void startedRequest(RequestInfo requestInfo)` > > `void completedRequest(RequestInfo requestInfo)` > > > > where `RequestInfo` is a simple POJO similar to > > `CongestionControlInfo` > > that you suggested > > > > public class RequestInfo { > > int failedMessages; > > int batchSize; > > long requestStartTime; > > } > > > > I think it would be more flexible and at the same time a simpler > public > > interface. Also we could provide the same builder that you > proposed in > > "Example configuring the Congestion Control Strategy using the > new > > interfaces", > > Or am I missing something? > > > > Best Piotrek > > > > pon., 20 cze 2022 o 09:52 Teoh, Hong > <lian...@amazon.co.uk.invalid> > > napisał(a): > > > > > Hi all, > > > > > > I would like to open a discussion on FLIP-242: Introduce > configurable > > > CongestionControlStrategy for Async Sink. > > > > > > The Async Sink was introduced as part of FLIP-171 [1], and > > implements a > > > non-configurable congestion control strategy to reduce network > > congestion > > > when the destination rejects or throttles requests. We want to > make > > this > > > configurable, to allow the sink implementer to decide the > desired > > > congestion control behaviour given a specific destination. > > > > > > This is a step towards reducing the barrier to entry to > writing new > > async > > > sinks in Flink! > > > > > > You can find more details in the FLIP-242 [2]. Looking forward > to > > your > > > feedback! > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > > > [2] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-242%3A+Introduce+configurable+CongestionControlStrategy+for+Async+Sink > > > > > > > > > Regards, > > > Hong > > > > > > > > > > > > > > >