Hi Thomas, Yes, adding a rate limiting operator in front of the sink would work for record rate limiting.
Another thing I am thinking is that for local throttling, it seems that throttling in sources and sinks has some subtle differences. For example, consider both source and sink as HDFS. For sources, rate limiter in each task could work independently without a problem, the total throughput will be throttled at PARALLELISM * PER_TASK_THRESHOLD. On the sink side it might be a little different. After some aggregations, the data might become skewed. In that case, some sink tasks with hot keys may hit the rate limit and create back pressure, while the other sink tasks are pretty idle. This could result in over-throttling. Thanks, Jiangjie (Becket) Qin On Sat, Feb 2, 2019 at 12:20 AM Thomas Weise <t...@apache.org> wrote: > Hi Becket, > > The throttling operator would suffer from the same issue of not being able > to accurately count bytes. > > On the other hand, it can be used by composition w/o modifying existing > operators. > > As for sinks, wouldn't an operator that adjusts the rate in front of the > sink suffice? > > Thomas > > > On Thu, Jan 31, 2019 at 11:42 PM Becket Qin <becket....@gmail.com> wrote: > > > Hi Thomas, > > > > Good point about counting bytes. It would be difficult to throttle the > byte > > rate with the existing API. And it seems that for sinks we have to do > that > > rate limiting in the sink implementation anyways. There are a few ways to > > do some abstraction, but maybe adding a RateLimiter is trivial enough so > we > > don't need to worry about reusing the throttling logic. > > > > But in any case, let's make sure the throttling threshold configuration > > names are the same for all the Source and Sinks. So the config parsing > > logic should probably still be put together in place. That is probably > some > > implementation details we can discuss when review the patch. > > > > I am not sure about adding another throttling operator. How would that > > operator get the serialized size if it is downstream of a source. And how > > would that work on the sink side? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <t...@apache.org> wrote: > > > > > I initially thought of an approach similar to the collector idea, by > > > overriding emitRecord in the fetcher. That makes counting the bytes > > > difficult, because it's downstream of decoding. > > > > > > Another idea of solving this in a reusable way was to have a separate > > rate > > > limiting operator chained downstream of the consumer, which would > develop > > > back pressure and slow down the consumer. However, that would interfere > > > with checkpoint barrier alignment (AFAIK, currently checkpoint barrier > > will > > > also be stuck in the backlog)? > > > > > > Thomas > > > > > > > > > > > > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler < > kkrugler_li...@transpac.com > > > > > > wrote: > > > > > > > +1, and something I was planning to comment on in the Jira issue. > > > > > > > > Also, if rate limiting could effectively stop the stream, then this > > could > > > > be used solve a common data enrichment issue. > > > > > > > > Logically you want to pause one stream (typically the time series > data > > > > being processed) while another stream (typically the broadcast) is > > > > broadcasting an update to enrichment data. > > > > > > > > Currently you have to buffer the time series data in your enrichment > > > > function, but if the rate limiter was pluggable, it could detect when > > > this > > > > enrichment update was happening. > > > > > > > > — Ken > > > > > > > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <becket....@gmail.com> > > wrote: > > > > > > > > > > Hi Jamie, > > > > > > > > > > Thanks for the explanation. That makes sense to me. I am wondering > if > > > > there > > > > > is a more general way to add a rate limiter to all the connecters > > > rather > > > > > than doing that for each individual one. For example, maybe we can > > have > > > > the > > > > > rate limiting logic in the Collector / Output, thus all the > > connectors > > > > > (even operators?) could be rate limited. > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao > > > > <l...@lyft.com.invalid> > > > > > wrote: > > > > > > > > > >> Thanks for adding more context @Jamie Grier <jgr...@lyft.com> . > > > > >> > > > > >> JIRA for this feature: > > > > https://issues.apache.org/jira/browse/FLINK-11501. > > > > >> > > > > >> Thanks > > > > >> Lakshmi > > > > >> > > > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <t...@apache.org> > > wrote: > > > > >> > > > > >>> I think it would be reasonable to have a rate limiter option in > the > > > > >>> consumer, given that others have also looked to solve this. > > > > >>> > > > > >>> I think for this and other optional features, it would be good to > > > > >> implement > > > > >>> in a way that overrides are possible. Someone else may want to do > > the > > > > >>> limiting differently, taking into account more/other factors. > > > > >>> > > > > >>> Both, adding the limiter and making the consumer code more > > adoptable > > > > >> could > > > > >>> be split into separate work also. > > > > >>> > > > > >>> BTW is there a JIRA for this? > > > > >>> > > > > >>> Thomas > > > > >>> > > > > >> > > > > >> > > > > >> -- > > > > >> *Lakshmi Gururaja Rao* > > > > >> SWE > > > > >> 217.778.7218 <+12177787218> > > > > >> [image: Lyft] <http://www.lyft.com/> > > > > >> > > > > > > > > -------------------------- > > > > Ken Krugler > > > > +1 530-210-6378 > > > > http://www.scaleunlimited.com > > > > Custom big data solutions & training > > > > Flink, Solr, Hadoop, Cascading & Cassandra > > > > > > > > > > > > > >