Apologies for the delay in responding here.

The idea of making the Ratelimiter config/creation logic generic across
connectors makes sense to me.

In the approach that we used and tested internally, we essentially created
a Guava RateLimiter
<https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html>
within the *KafkaConsumerThread* with a desired rate and moved the
*consumer.poll()
*into a separate method that uses the bytes received from every call to *poll()
*to control the rate (i.e. as a parameter to the *acquire()* call). We have
abstracted out the RateLimiting configuration and creation into a
RateLimiterFactory to make it re-usable for other connectors.

I also added some of the results we got from testing this approach on the
FLINK JIRA - https://issues.apache.org/jira/browse/FLINK-11501 . I'll share
a PR with the approach shortly and hopefully we can use that as a starting
point to discuss this feature further.

Thanks
Lakshmi

On Fri, Feb 1, 2019 at 8:08 PM Becket Qin <becket....@gmail.com> wrote:

> Hi Thomas,
>
> Yes, adding a rate limiting operator in front of the sink would work for
> record rate limiting.
>
> Another thing I am thinking is that for local throttling, it seems that
> throttling in sources and sinks has some subtle differences. For example,
> consider both source and sink as HDFS. For sources, rate limiter in each
> task could work independently without a problem, the total throughput will
> be throttled at PARALLELISM * PER_TASK_THRESHOLD.
>
> On the sink side it might be a little different. After some aggregations,
> the data might become skewed. In that case, some sink tasks with hot keys
> may hit the rate limit and create back pressure, while the other sink tasks
> are pretty idle. This could result in over-throttling.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Feb 2, 2019 at 12:20 AM Thomas Weise <t...@apache.org> wrote:
>
> > Hi Becket,
> >
> > The throttling operator would suffer from the same issue of not being
> able
> > to accurately count bytes.
> >
> > On the other hand, it can be used by composition w/o modifying existing
> > operators.
> >
> > As for sinks, wouldn't an operator that adjusts the rate in front of the
> > sink suffice?
> >
> > Thomas
> >
> >
> > On Thu, Jan 31, 2019 at 11:42 PM Becket Qin <becket....@gmail.com>
> wrote:
> >
> > > Hi Thomas,
> > >
> > > Good point about counting bytes. It would be difficult to throttle the
> > byte
> > > rate with the existing API. And it seems that for sinks we have to do
> > that
> > > rate limiting in the sink implementation anyways. There are a few ways
> to
> > > do some abstraction, but maybe adding a RateLimiter is trivial enough
> so
> > we
> > > don't need to worry about reusing the throttling logic.
> > >
> > > But in any case, let's make sure the throttling threshold configuration
> > > names are the same for all the Source and Sinks. So the config parsing
> > > logic should probably still be put together in place. That is probably
> > some
> > > implementation details we can discuss when review the patch.
> > >
> > > I am not sure about adding another throttling operator. How would that
> > > operator get the serialized size if it is downstream of a source. And
> how
> > > would that work on the sink side?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <t...@apache.org> wrote:
> > >
> > > > I initially thought of an approach similar to the collector idea, by
> > > > overriding emitRecord in the fetcher. That makes counting the bytes
> > > > difficult, because it's downstream of decoding.
> > > >
> > > > Another idea of solving this in a reusable way was to have a separate
> > > rate
> > > > limiting operator chained downstream of the consumer, which would
> > develop
> > > > back pressure and slow down the consumer. However, that would
> interfere
> > > > with checkpoint barrier alignment (AFAIK, currently checkpoint
> barrier
> > > will
> > > > also be stuck in the backlog)?
> > > >
> > > > Thomas
> > > >
> > > >
> > > >
> > > > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler <
> > kkrugler_li...@transpac.com
> > > >
> > > > wrote:
> > > >
> > > > > +1, and something I was planning to comment on in the Jira issue.
> > > > >
> > > > > Also, if rate limiting could effectively stop the stream, then this
> > > could
> > > > > be used solve a common data enrichment issue.
> > > > >
> > > > > Logically you want to pause one stream (typically the time series
> > data
> > > > > being processed) while another stream (typically the broadcast) is
> > > > > broadcasting an update to enrichment data.
> > > > >
> > > > > Currently you have to buffer the time series data in your
> enrichment
> > > > > function, but if the rate limiter was pluggable, it could detect
> when
> > > > this
> > > > > enrichment update was happening.
> > > > >
> > > > >  — Ken
> > > > >
> > > > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <becket....@gmail.com>
> > > wrote:
> > > > > >
> > > > > > Hi Jamie,
> > > > > >
> > > > > > Thanks for the explanation. That makes sense to me. I am
> wondering
> > if
> > > > > there
> > > > > > is a more general way to add a rate limiter to all the connecters
> > > > rather
> > > > > > than doing that for each individual one. For example, maybe we
> can
> > > have
> > > > > the
> > > > > > rate limiting logic in the Collector / Output, thus all the
> > > connectors
> > > > > > (even operators?) could be rate limited.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao
> > > > > <l...@lyft.com.invalid>
> > > > > > wrote:
> > > > > >
> > > > > >> Thanks for adding more context @Jamie Grier <jgr...@lyft.com> .
> > > > > >>
> > > > > >> JIRA for this feature:
> > > > > https://issues.apache.org/jira/browse/FLINK-11501.
> > > > > >>
> > > > > >> Thanks
> > > > > >> Lakshmi
> > > > > >>
> > > > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <t...@apache.org>
> > > wrote:
> > > > > >>
> > > > > >>> I think it would be reasonable to have a rate limiter option in
> > the
> > > > > >>> consumer, given that others have also looked to solve this.
> > > > > >>>
> > > > > >>> I think for this and other optional features, it would be good
> to
> > > > > >> implement
> > > > > >>> in a way that overrides are possible. Someone else may want to
> do
> > > the
> > > > > >>> limiting differently, taking into account more/other factors.
> > > > > >>>
> > > > > >>> Both, adding the limiter and making the consumer code more
> > > adoptable
> > > > > >> could
> > > > > >>> be split into separate work also.
> > > > > >>>
> > > > > >>> BTW is there a JIRA for this?
> > > > > >>>
> > > > > >>> Thomas
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> *Lakshmi Gururaja Rao*
> > > > > >> SWE
> > > > > >> 217.778.7218 <+12177787218>
> > > > > >> [image: Lyft] <http://www.lyft.com/>
> > > > > >>
> > > > >
> > > > > --------------------------
> > > > > Ken Krugler
> > > > > +1 530-210-6378
> > > > > http://www.scaleunlimited.com
> > > > > Custom big data solutions & training
> > > > > Flink, Solr, Hadoop, Cascading & Cassandra
> > > > >
> > > > >
> > > >
> > >
> >
>


-- 
*Lakshmi Gururaja Rao*
SWE
217.778.7218 <+12177787218>
[image: Lyft] <http://www.lyft.com/>

Reply via email to