Hi Hong,

> ## Q1: Do we need to pass in batch of messages / RequestInfo to
`shouldBlock()`

If you think having strict limits is important, we could reverse the
`startedRequest` and `shouldBlock` calls as I suggested in my last email:

1. AsyncSinkWriter constructs request with batch of messages
  - This can be triggered by one of 3 conditions: Timer trigger, batch byte
size threshold, batch number size threshold.
2. AsyncSinkWriter calls RateLimitingStrategy.startedRequest(RequestInfo)
3. AsyncSinkWriter calls RateLimitingStrategy.shouldBlock()
4. When request completes, AsyncSinkWriter calls
RateLimitingStrategy.completedRequest(RequestInfo)

> ## Q2: Do we need to expose nextBatchSize or something to affect the
construction of the next batch?
> If we go with never exceeding the maxInFlightMessages, we can enter a
situation where there are no in-flight requests, and the next request's
batch size is larger than the maxInFlightMessages.

Isn't `maxInFlightMessages` the same as the currently misnamed
`maxBufferedRequests`? Or do you maybe mean to limit the size of the next
batch, the equivalent of the currently existing
`AIMDRateLimitingStrategy#getRateLimit`? But that's a different limit
and +1 for keeping it.

Best,
Piotrek


sob., 25 cze 2022 o 21:56 Teoh, Hong <lian...@amazon.co.uk> napisał(a):

> Hi Piotr,
>
> Thanks for the further questions.
>
> To answer both your questions, let's consider the different flows for
> interaction between RateLimitingStrategy and AsyncSinkWriter (IMO it is
> important since it affects the interface!):
>
> One potential interaction flow is:
> 1. AsyncSinkWriter constructs request with batch of messages
>   - This can be triggered by one of 3 conditions: Timer trigger, batch
> byte size threshold, batch number size threshold.
> 2. AsyncSinkWriter calls RateLimitingStrategy.shouldBlock()
> 3. If request starts, AsyncSinkWriter calls
> RateLimitingStrategy.startedRequest(RequestInfo)
> 4. When request completes, AsyncSinkWriter calls
> RateLimitingStrategy.completedRequest(RequestInfo)
>
> ## Q1: Do we need to pass in batch of messages / RequestInfo to
> `shouldBlock()`
> My thoughts are that `shouldBlock()` needs to know whether the
> (currentInFlight + newRequest > maxInFlight), and reject if so. An
> alternative to avoid passing this info in is to make RateLimitingStrategy
> only reject the request AFTER the (currentInFlight > maxInFlight).
> However, I don't like the idea that a sink has the ability to exceed its
> `maxInFlightMessages`, since it will not be doing what it says on the box.
>
> ## Q2: Do we need to expose nextBatchSize or something to affect the
> construction of the next batch?
> If we go with never exceeding the maxInFlightMessages, we can enter a
> situation where there are no in-flight requests, and the next request's
> batch size is larger than the maxInFlightMessages.
> 1) We will not update RateLimitingStrategy's maxInFlightMessages, since
> this will only be called once a request is started / completed
> 2) We cannot guarantee the next request will have a smaller batch size,
> since the AsyncSinkWriter makes this construction independently of the
> maxInFlightMessages current set in RateLimitingStrategy.
>
> We can close this gap by exposing `currentMaxBatchSize()` or
> `nextBatchSize()` in RateLimitingStrategy that will tell AsyncSinkWriter
> the maximum size of the next batch.
>
> > btw, isn't this variable misnamed? (maxBufferedRequests)
>
> Yes you are right. It would be more appropriately named
> maxBufferedMessages or maxBufferedRequestEntries. However, these names are
> internal, so we can rename it appropriately.
>
> Regards,
> Hong
>
>
>
> On 24/06/2022, 17:07, "Piotr Nowojski" <pnowoj...@apache.org> wrote:
>
>     CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
>     Hi Hong,
>
>     Thanks for your clarifications.
>
>     > We can change it such that AsyncSinkWriter passes a constructed
> batch of
>     messages to the RateLimitingStrategy.shouldBlock(), and that returns a
>     boolean, so the RateLimitingStrategy can decide the evaluation logic
>     internally.
>
>     Do we need to pass the constructed batch of messages to
> `shouldBlock()`?
>     Can we not call `startRequest()` first, then ask whether we should
> block?
>     We could check `shouldBlock()` either after or before actually
> starting the
>     request that we marked in `startRequest()` - it seems to me this could
> be
>     an implementation detail. In either way, the system would behave more
> or
>     less the same. Does it matter if we block one request later or sooner?
>
>     > but we also have to also expose "currentInFlightMessageCapacity"
> from the
>     RateLimitingStrategy as well
>
>     Do we have to? Currently this is checked against
>     `AsyncSinkWriter#maxBufferedRequests`, can we not keep it like that?
> And
>     btw, isn't this variable misnamed? It suggests checking the max number
> of
>     requests (one async request = one batch?), but from the code it looks
> like
>     it's actually `maxBufferedMessages`?
>
>     Best,
>     Piotrek
>
>
>     pt., 24 cze 2022 o 09:34 Teoh, Hong <lian...@amazon.co.uk> napisał(a):
>
>     > Hi Piotr,
>     >
>     > Thanks for your feedback!
>     >
>     > > As I understand it, this effort is about replacing hardcoded
>     >     `AIMDRateLimitingStrategy` with something more flexible?
>     >
>     > Yes __
>     >
>     >
>     > > I have one main question about the design. Why are you
>     >     trying to split it into three different interfaces?
>     >     Can not we have a single public interface `RateLimitingStrategy`
>     >
>     > You're right about the intention being to separate out the (what),
> (when)
>     > and (how).
>     >
>     > The intention here was to make the separation of concerns clearer,
> but I
>     > like your idea to reduce the surface area in the interface.
>     > We can change it such that AsyncSinkWriter passes a constructed
> batch of
>     > messages to the RateLimitingStrategy.shouldBlock(), and that returns
> a
>     > boolean, so the RateLimitingStrategy can decide the evaluation logic
>     > internally. (That was my original idea too __ )
>     >
>     > 1. RateLimitingStrategy can update it's internal logic on
> `completeRequest
>     > ` `startRequest` methods
>     > 2. RateLimitingStrategy can provide a go/no-go decision on
> `shouldBlock`,
>     > given a List of requests.
>     >
>     > The above works, but we also have to also expose
>     > "currentInFlightMessageCapacity" from the RateLimitingStrategy as
> well
>     > (since it is important the batch of messages in the proposed request
> be
>     > constructed < currentInFlightMessageCapacity), otherwise we will end
> up in
>     > a situation where requests will never be sent.
>     >
>     > An alternative is to give RateLimitingStrategy the power to
> construct the
>     > size of the batches, I think that would be bloating the
> responsibility of
>     > the RateLimitingStrategy a little too much. What do you think?
>     >
>     >
>     > Regards,
>     > Hong
>     >
>     > On 23/06/2022, 10:05, "Piotr Nowojski" <pnowoj...@apache.org> wrote:
>     >
>     >     CAUTION: This email originated from outside of the organization.
> Do
>     > not click links or open attachments unless you can confirm the
> sender and
>     > know the content is safe.
>     >
>     >
>     >
>     >     Hi Hong,
>     >
>     >     As I understand it, this effort is about replacing hardcoded
>     >     `AIMDRateLimitingStrategy` with something more flexible? +1 for
> the
>     > general
>     >     idea.
>     >
>     >     If I read it correctly, there are basically three issues:
>     >     1. (what) `AIMDRateLimitingStrategy` is only able to limit the
> size of
>     > all
>     >     in-flight records across all batches, not the amount of in-flight
>     > batches.
>     >     2. (when) Currently `AsyncSinkWriter` decides whether and when to
>     > scale up
>     >     or down. You would like it to be customisable behaviour.
>     >     3. (how) The actual `scaleUp()` and `scaleDown()` behaviours are
>     > hardcoded,
>     >     and this could be customised as well.
>     >
>     >     Right? Assuming so, I have one main question about the design.
> Why are
>     > you
>     >     trying to split it into three different interfaces?
>     >     Can not we have a single public interface `RateLimitingStrategy`
>     > instead of
>     >     three that you proposed, that would have methods like:
>     >
>     >     `bool shouldRateLimit()` / `bool shouldBlock()`
>     >     `void startedRequest(RequestInfo requestInfo)`
>     >     `void completedRequest(RequestInfo requestInfo)`
>     >
>     >     where  `RequestInfo` is a simple POJO similar to
>     > `CongestionControlInfo`
>     >     that you suggested
>     >
>     >     public class RequestInfo {
>     >       int failedMessages;
>     >       int batchSize;
>     >       long requestStartTime;
>     >     }
>     >
>     >     I think it would be more flexible and at the same time a simpler
> public
>     >     interface. Also we could provide the same builder that you
> proposed in
>     >     "Example configuring the Congestion Control Strategy using the
> new
>     >     interfaces",
>     >     Or am I missing something?
>     >
>     >     Best Piotrek
>     >
>     >     pon., 20 cze 2022 o 09:52 Teoh, Hong
> <lian...@amazon.co.uk.invalid>
>     >     napisał(a):
>     >
>     >     > Hi all,
>     >     >
>     >     > I would like to open a discussion on FLIP-242: Introduce
> configurable
>     >     > CongestionControlStrategy for Async Sink.
>     >     >
>     >     > The Async Sink was introduced as part of FLIP-171 [1], and
>     > implements a
>     >     > non-configurable congestion control strategy to reduce network
>     > congestion
>     >     > when the destination rejects or throttles requests. We want to
> make
>     > this
>     >     > configurable, to allow the sink implementer to decide the
> desired
>     >     > congestion control behaviour given a specific destination.
>     >     >
>     >     > This is a step towards reducing the barrier to entry to
> writing new
>     > async
>     >     > sinks in Flink!
>     >     >
>     >     > You can find more details in the FLIP-242 [2]. Looking forward
> to
>     > your
>     >     > feedback!
>     >     >
>     >     > [1]
>     >     >
>     >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>     >     > [2]
>     >     >
>     >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-242%3A+Introduce+configurable+CongestionControlStrategy+for+Async+Sink
>     >     >
>     >     >
>     >     > Regards,
>     >     > Hong
>     >     >
>     >     >
>     >     >
>     >
>     >
>
>

Reply via email to