I agree that a global rate limiter would be ideal, but either we make all
runners implement one as part of Beam (and the requisite SDK side hooks) or
we're forcing users to deploy their own solution, which they can already do.

A good enough in-current-model solution is probably fine for many users.

On Wed, Feb 21, 2024, 10:30 AM Reuven Lax via dev <dev@beam.apache.org>
wrote:

> Yes, that's true. The technique I proposed will work for simple pipelines
> in streaming (e.g. basic ETL), where the throttling threads are probably
> all scheduled. For more complicated pipelines (or batch pipelines), we
> might find that it overthrottles. Maybe a hybrid solution that uses state
> would work?
>
> Another option is to have a global token-based rate limiter, but this gets
> a bit more complicated.
>
> BTW - some sinks (especially databases) care more about maximum number of
> concurrent connections. It's interesting to think about this scenario as
> well, but I think it's a bit orthogonal to the current discussion.
>
> Reuven
>
> On Wed, Feb 21, 2024 at 9:45 AM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> I like the idea of pushing back to the source much better than
>> unboundedly buffering things in state. I was trying to think of how to
>> just slow things down and one problem is that while we can easily
>> control the number of keys, it's much harder to control (or even
>> detect) the number of parallel threads at any given point in time (for
>> which keys is simply an upper bound, especially in batch).
>>
>> On Wed, Feb 21, 2024 at 9:28 AM Reuven Lax <re...@google.com> wrote:
>> >
>> > Agreed, that event-time throttling doesn't make sense here. In theory
>> processing-time timers have no SLA - i.e. their firing might be delayed -
>> so batch runners aren't violating the model by firing them all at the end;
>> however it does make processing time timers less useful in batch, as we see
>> here.
>> >
>> > Personally, I'm not sure I would use state and timers to implement
>> this, and I definitely wouldn't create this many keys. A couple of reasons
>> for this:
>> >   1. If a pipeline is receiving input faster than the throttle rate,
>> the proposed technique would shift all those elements into the DoFn's state
>> which will keep growing indefinitely. Generally we would prefer to leave
>> that backlog in the source instead of copying it into DoFn state.
>> >   2. In my experience with throttling, having too much parallelism is
>> problematic. The issue is that there is some error involved whenever you
>> throttle, and this error can accumulate across many shards (and when I've
>> done this sort of thing before, I found that the error was often biased in
>> one direction). If targeting 100,000 records/sec, this  approach (if I
>> understand it correctly) would create 100,000 shards and throttle them each
>> to one element/sec. I doubt this will actually result in anything close to
>> desired throttling.
>> >   3. Very commonly, the request is to throttle based on bytes/sec, not
>> events/sec. Anything we build should be easily extensible to bytes/sec.
>> >
>> > What I would suggest (and what Beam users have often done in the past)
>> would be to bucket the PCollection into N buckets where N is generally
>> smallish (100 buckets, 1000 buckets, depending on the expected throughput);
>> runners that support autosharding (such as Dataflow) can automatically
>> choose N. Each shard then throttles its output to rate/N. Keeping N no
>> larger than necessary minimizes the error introduced into throttling.
>> >
>> > We also don't necessarily need state/timers here - each shard is
>> processed on a single thread, so those threads can simply throttle calls to
>> OutputReceiver.output. This way if the pipeline is exceeding the threshold,
>> backpressure will tend to simply leave excess data in the source. This also
>> is a simpler design than the proposed one.
>> >
>> > A more sophisticated design might combine elements of both - buffering
>> a bounded amount of data in state when the threshold is exceeded, but
>> severely limiting the state size. However I wouldn't start here - we would
>> want to build the simpler implementation first and see how it performs.
>> >
>> > On Wed, Feb 21, 2024 at 8:53 AM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>> >>
>> >> On Wed, Feb 21, 2024 at 12:48 AM Jan Lukavský <je...@seznam.cz> wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I have left a note regarding the proposed splitting of batch and
>> >> > streaming expansion of this transform. In general, a need for such
>> split
>> >> > triggers doubts in me. This signals that either
>> >> >
>> >> >   a) the transform does something is should not, or
>> >> >
>> >> >   b) Beam model is not complete in terms of being "unified"
>> >> >
>> >> > The problem that is described in the document is that in the batch
>> case
>> >> > timers are not fired appropriately.
>> >>
>> >> +1. The underlying flaw is that processing time timers are not handled
>> >> correctly in batch, but should be (even if it means keeping workers
>> >> idle?). We should fix this.
>> >>
>> >> > This is actually on of the
>> >> > motivations that led to introduction of @RequiresTimeSortedInput
>> >> > annotation and, though mentioned years ago as a question, I do not
>> >> > remember what arguments were used against enforcing sorting inputs by
>> >> > timestamp in the batch stateful DoFn as a requirement in the model.
>> That
>> >> > would enable the appropriate firing of timers while preserving the
>> batch
>> >> > invariant which is there are no late data allowed. IIRC there are
>> >> > runners that do this sorting by default (at least the sorting, not
>> sure
>> >> > about the timers, but once inputs are sorted, firing timers is
>> simple).
>> >> >
>> >> > A different question is if this particular transform should maybe
>> fire
>> >> > not by event time, but rather processing time?
>> >>
>> >> Yeah, I was reading all of these as processing time. Throttling by
>> >> event time doesn't make much sense.
>> >>
>> >> > On 2/21/24 03:00, Robert Burke wrote:
>> >> > > Thanks for the design Damon! And thanks for collaborating with me
>> on getting a high level textual description of the key implementation idea
>> down in writing. I think the solution is pretty elegant.
>> >> > >
>> >> > > I do have concerns about how different Runners might handle
>> ProcessContinuations for the Bounded Input case. I know Dataflow famously
>> has two different execution modes under the hood, but I agree with the
>> principle that ProcessContinuation.Resume should largely be in line with
>> the expected delay, though it's by no means guaranteed AFAIK.
>> >> > >
>> >> > > We should also ensure this is linked from
>> https://s.apache.org/beam-design-docs if not already.
>> >> > >
>> >> > > Robert Burke
>> >> > > Beam Go Busybody
>> >> > >
>> >> > > On 2024/02/20 14:00:00 Damon Douglas wrote:
>> >> > >> Hello Everyone,
>> >> > >>
>> >> > >> The following describes a Throttle PTransform that holds element
>> throughput
>> >> > >> to minimize downstream API overusage. Thank you for reading and
>> your
>> >> > >> valuable input.
>> >> > >>
>> >> > >> https://s.apache.org/beam-throttle-transform
>> >> > >>
>> >> > >> Best,
>> >> > >>
>> >> > >> Damon
>> >> > >>
>>
>

Reply via email to