I initially thought of an approach similar to the collector idea, by overriding emitRecord in the fetcher. That makes counting the bytes difficult, because it's downstream of decoding.
Another idea of solving this in a reusable way was to have a separate rate limiting operator chained downstream of the consumer, which would develop back pressure and slow down the consumer. However, that would interfere with checkpoint barrier alignment (AFAIK, currently checkpoint barrier will also be stuck in the backlog)? Thomas On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler <kkrugler_li...@transpac.com> wrote: > +1, and something I was planning to comment on in the Jira issue. > > Also, if rate limiting could effectively stop the stream, then this could > be used solve a common data enrichment issue. > > Logically you want to pause one stream (typically the time series data > being processed) while another stream (typically the broadcast) is > broadcasting an update to enrichment data. > > Currently you have to buffer the time series data in your enrichment > function, but if the rate limiter was pluggable, it could detect when this > enrichment update was happening. > > — Ken > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <becket....@gmail.com> wrote: > > > > Hi Jamie, > > > > Thanks for the explanation. That makes sense to me. I am wondering if > there > > is a more general way to add a rate limiter to all the connecters rather > > than doing that for each individual one. For example, maybe we can have > the > > rate limiting logic in the Collector / Output, thus all the connectors > > (even operators?) could be rate limited. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao > <l...@lyft.com.invalid> > > wrote: > > > >> Thanks for adding more context @Jamie Grier <jgr...@lyft.com> . > >> > >> JIRA for this feature: > https://issues.apache.org/jira/browse/FLINK-11501. > >> > >> Thanks > >> Lakshmi > >> > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <t...@apache.org> wrote: > >> > >>> I think it would be reasonable to have a rate limiter option in the > >>> consumer, given that others have also looked to solve this. > >>> > >>> I think for this and other optional features, it would be good to > >> implement > >>> in a way that overrides are possible. Someone else may want to do the > >>> limiting differently, taking into account more/other factors. > >>> > >>> Both, adding the limiter and making the consumer code more adoptable > >> could > >>> be split into separate work also. > >>> > >>> BTW is there a JIRA for this? > >>> > >>> Thomas > >>> > >> > >> > >> -- > >> *Lakshmi Gururaja Rao* > >> SWE > >> 217.778.7218 <+12177787218> > >> [image: Lyft] <http://www.lyft.com/> > >> > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > >