Apologies for the delay in responding here. The idea of making the Ratelimiter config/creation logic generic across connectors makes sense to me.
In the approach that we used and tested internally, we essentially created a Guava RateLimiter <https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html> within the *KafkaConsumerThread* with a desired rate and moved the *consumer.poll() *into a separate method that uses the bytes received from every call to *poll() *to control the rate (i.e. as a parameter to the *acquire()* call). We have abstracted out the RateLimiting configuration and creation into a RateLimiterFactory to make it re-usable for other connectors. I also added some of the results we got from testing this approach on the FLINK JIRA - https://issues.apache.org/jira/browse/FLINK-11501 . I'll share a PR with the approach shortly and hopefully we can use that as a starting point to discuss this feature further. Thanks Lakshmi On Fri, Feb 1, 2019 at 8:08 PM Becket Qin <becket....@gmail.com> wrote: > Hi Thomas, > > Yes, adding a rate limiting operator in front of the sink would work for > record rate limiting. > > Another thing I am thinking is that for local throttling, it seems that > throttling in sources and sinks has some subtle differences. For example, > consider both source and sink as HDFS. For sources, rate limiter in each > task could work independently without a problem, the total throughput will > be throttled at PARALLELISM * PER_TASK_THRESHOLD. > > On the sink side it might be a little different. After some aggregations, > the data might become skewed. In that case, some sink tasks with hot keys > may hit the rate limit and create back pressure, while the other sink tasks > are pretty idle. This could result in over-throttling. > > Thanks, > > Jiangjie (Becket) Qin > > On Sat, Feb 2, 2019 at 12:20 AM Thomas Weise <t...@apache.org> wrote: > > > Hi Becket, > > > > The throttling operator would suffer from the same issue of not being > able > > to accurately count bytes. > > > > On the other hand, it can be used by composition w/o modifying existing > > operators. > > > > As for sinks, wouldn't an operator that adjusts the rate in front of the > > sink suffice? > > > > Thomas > > > > > > On Thu, Jan 31, 2019 at 11:42 PM Becket Qin <becket....@gmail.com> > wrote: > > > > > Hi Thomas, > > > > > > Good point about counting bytes. It would be difficult to throttle the > > byte > > > rate with the existing API. And it seems that for sinks we have to do > > that > > > rate limiting in the sink implementation anyways. There are a few ways > to > > > do some abstraction, but maybe adding a RateLimiter is trivial enough > so > > we > > > don't need to worry about reusing the throttling logic. > > > > > > But in any case, let's make sure the throttling threshold configuration > > > names are the same for all the Source and Sinks. So the config parsing > > > logic should probably still be put together in place. That is probably > > some > > > implementation details we can discuss when review the patch. > > > > > > I am not sure about adding another throttling operator. How would that > > > operator get the serialized size if it is downstream of a source. And > how > > > would that work on the sink side? > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <t...@apache.org> wrote: > > > > > > > I initially thought of an approach similar to the collector idea, by > > > > overriding emitRecord in the fetcher. That makes counting the bytes > > > > difficult, because it's downstream of decoding. > > > > > > > > Another idea of solving this in a reusable way was to have a separate > > > rate > > > > limiting operator chained downstream of the consumer, which would > > develop > > > > back pressure and slow down the consumer. However, that would > interfere > > > > with checkpoint barrier alignment (AFAIK, currently checkpoint > barrier > > > will > > > > also be stuck in the backlog)? > > > > > > > > Thomas > > > > > > > > > > > > > > > > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler < > > kkrugler_li...@transpac.com > > > > > > > > wrote: > > > > > > > > > +1, and something I was planning to comment on in the Jira issue. > > > > > > > > > > Also, if rate limiting could effectively stop the stream, then this > > > could > > > > > be used solve a common data enrichment issue. > > > > > > > > > > Logically you want to pause one stream (typically the time series > > data > > > > > being processed) while another stream (typically the broadcast) is > > > > > broadcasting an update to enrichment data. > > > > > > > > > > Currently you have to buffer the time series data in your > enrichment > > > > > function, but if the rate limiter was pluggable, it could detect > when > > > > this > > > > > enrichment update was happening. > > > > > > > > > > — Ken > > > > > > > > > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <becket....@gmail.com> > > > wrote: > > > > > > > > > > > > Hi Jamie, > > > > > > > > > > > > Thanks for the explanation. That makes sense to me. I am > wondering > > if > > > > > there > > > > > > is a more general way to add a rate limiter to all the connecters > > > > rather > > > > > > than doing that for each individual one. For example, maybe we > can > > > have > > > > > the > > > > > > rate limiting logic in the Collector / Output, thus all the > > > connectors > > > > > > (even operators?) could be rate limited. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao > > > > > <l...@lyft.com.invalid> > > > > > > wrote: > > > > > > > > > > > >> Thanks for adding more context @Jamie Grier <jgr...@lyft.com> . > > > > > >> > > > > > >> JIRA for this feature: > > > > > https://issues.apache.org/jira/browse/FLINK-11501. > > > > > >> > > > > > >> Thanks > > > > > >> Lakshmi > > > > > >> > > > > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <t...@apache.org> > > > wrote: > > > > > >> > > > > > >>> I think it would be reasonable to have a rate limiter option in > > the > > > > > >>> consumer, given that others have also looked to solve this. > > > > > >>> > > > > > >>> I think for this and other optional features, it would be good > to > > > > > >> implement > > > > > >>> in a way that overrides are possible. Someone else may want to > do > > > the > > > > > >>> limiting differently, taking into account more/other factors. > > > > > >>> > > > > > >>> Both, adding the limiter and making the consumer code more > > > adoptable > > > > > >> could > > > > > >>> be split into separate work also. > > > > > >>> > > > > > >>> BTW is there a JIRA for this? > > > > > >>> > > > > > >>> Thomas > > > > > >>> > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> *Lakshmi Gururaja Rao* > > > > > >> SWE > > > > > >> 217.778.7218 <+12177787218> > > > > > >> [image: Lyft] <http://www.lyft.com/> > > > > > >> > > > > > > > > > > -------------------------- > > > > > Ken Krugler > > > > > +1 530-210-6378 > > > > > http://www.scaleunlimited.com > > > > > Custom big data solutions & training > > > > > Flink, Solr, Hadoop, Cascading & Cassandra > > > > > > > > > > > > > > > > > > > > -- *Lakshmi Gururaja Rao* SWE 217.778.7218 <+12177787218> [image: Lyft] <http://www.lyft.com/>