Unsubscribe

魏宪党
[email protected]



        



         原始邮件
         
       
发件人:Arvid Heise <[email protected]&gt;
发件时间:2025年8月5日 17:48
收件人:dev <[email protected]&gt;
主题:Re: Re: [DISCUSS] FLIP-535:Introduce RateLimiter to Source



       Hi&nbsp;Zexian,

the&nbsp;general&nbsp;idea&nbsp;and&nbsp;approach&nbsp;LGTM.&nbsp;A&nbsp;couple&nbsp;of&nbsp;questions:
*&nbsp;If&nbsp;we&nbsp;don't&nbsp;want&nbsp;to&nbsp;provide&nbsp;RateLimiter&nbsp;on&nbsp;operator&nbsp;level&nbsp;(seems&nbsp;to&nbsp;be
out&nbsp;of&nbsp;scope&nbsp;for&nbsp;this&nbsp;FLIP),&nbsp;can&nbsp;we&nbsp;still&nbsp;make&nbsp;the&nbsp;RateLimiter&nbsp;a&nbsp;property
of&nbsp;the&nbsp;Source&nbsp;similar&nbsp;to&nbsp;Watermark&nbsp;strategy&nbsp;and&nbsp;pass&nbsp;it&nbsp;through&nbsp;the
implementation?&nbsp;I'm&nbsp;guessing&nbsp;that&nbsp;we&nbsp;will&nbsp;then&nbsp;only&nbsp;have&nbsp;very&nbsp;coarse-grain
control&nbsp;because&nbsp;it's&nbsp;basically&nbsp;implemented&nbsp;on&nbsp;SourceOperator&nbsp;and&nbsp;hence&nbsp;can
limit&nbsp;batches&nbsp;only.&nbsp;The&nbsp;upside&nbsp;is&nbsp;that&nbsp;we&nbsp;don't&nbsp;need&nbsp;to&nbsp;touch&nbsp;each&nbsp;and
every&nbsp;connector&nbsp;and&nbsp;get&nbsp;something&nbsp;"for&nbsp;free"&nbsp;that&nbsp;probably&nbsp;works&nbsp;good
enough&nbsp;for&nbsp;many&nbsp;cases.
*&nbsp;We&nbsp;could&nbsp;additionally&nbsp;introduce&nbsp;some&nbsp;kind&nbsp;of&nbsp;mixin&nbsp;to&nbsp;the&nbsp;SourceReader
interface&nbsp;that&nbsp;would&nbsp;be&nbsp;able&nbsp;to&nbsp;push&nbsp;down&nbsp;the&nbsp;RateLimiter&nbsp;into&nbsp;the
connector.&nbsp;That&nbsp;would&nbsp;enable&nbsp;fine-grain&nbsp;control&nbsp;as&nbsp;you&nbsp;proposed.
*&nbsp;Note&nbsp;that&nbsp;each&nbsp;and&nbsp;every&nbsp;change&nbsp;to&nbsp;the&nbsp;public&nbsp;APIs&nbsp;will&nbsp;break&nbsp;connectors
in&nbsp;a&nbsp;way&nbsp;(your&nbsp;migration&nbsp;section&nbsp;is&nbsp;a&nbsp;bit&nbsp;slim&nbsp;on&nbsp;that&nbsp;part).&nbsp;In&nbsp;this&nbsp;case,
if&nbsp;you&nbsp;add&nbsp;a&nbsp;new&nbsp;overload&nbsp;to&nbsp;the&nbsp;ctor&nbsp;of&nbsp;SourceReaderBase,&nbsp;implementing
connectors&nbsp;will&nbsp;not&nbsp;run&nbsp;on&nbsp;Flink&nbsp;2.1-&nbsp;anymore.&nbsp;Unfortunately,&nbsp;mixins&nbsp;have
the&nbsp;same&nbsp;issue.&nbsp;I&nbsp;have&nbsp;yet&nbsp;to&nbsp;find&nbsp;a&nbsp;good&nbsp;solution&nbsp;to&nbsp;support&nbsp;features
optionally,&nbsp;such&nbsp;that&nbsp;the&nbsp;source&nbsp;runs&nbsp;fine&nbsp;on&nbsp;older&nbsp;versions&nbsp;(without&nbsp;rate
limiting)&nbsp;and&nbsp;on&nbsp;newer&nbsp;versions.&nbsp;I&nbsp;guess&nbsp;the&nbsp;only&nbsp;way&nbsp;would&nbsp;be&nbsp;some
reflection&nbsp;magic&nbsp;and&nbsp;I'd&nbsp;rather&nbsp;have&nbsp;less&nbsp;of&nbsp;that&nbsp;in&nbsp;Flink&nbsp;than&nbsp;more.
*&nbsp;I'm&nbsp;not&nbsp;sold&nbsp;on&nbsp;the&nbsp;idea&nbsp;that&nbsp;we&nbsp;need&nbsp;to&nbsp;pass&nbsp;notifyCheckpointAborted&nbsp;to
the&nbsp;RateLimiter&nbsp;for&nbsp;the&nbsp;sources.&nbsp;Can&nbsp;you&nbsp;expand&nbsp;on&nbsp;why&nbsp;this&nbsp;is&nbsp;needed?
Checkpoint&nbsp;barrier&nbsp;is&nbsp;injected&nbsp;differently&nbsp;into&nbsp;the&nbsp;sources&nbsp;than&nbsp;other
operators.&nbsp;notifyCheckpointAborted&nbsp;is&nbsp;also&nbsp;not&nbsp;guaranteed&nbsp;to&nbsp;be&nbsp;called,&nbsp;so
if&nbsp;there&nbsp;is&nbsp;a&nbsp;risk&nbsp;of&nbsp;deadlock,&nbsp;we&nbsp;should&nbsp;find&nbsp;other&nbsp;options.

Best,

Arvid

On&nbsp;Tue,&nbsp;Aug&nbsp;5,&nbsp;2025&nbsp;at&nbsp;8:48 
AM&nbsp;Jiangang&nbsp;Liu&nbsp;<[email protected]&gt;
wrote:

&gt;&nbsp;Thanks&nbsp;for&nbsp;your&nbsp;explanation.
&gt;&nbsp;1.&nbsp;It&nbsp;makes&nbsp;sense&nbsp;to&nbsp;me&nbsp;to&nbsp;solve&nbsp;the&nbsp;imbalance&nbsp;in&nbsp;other&nbsp;issues.
&gt;&nbsp;2.&nbsp;In&nbsp;fact,&nbsp;we&nbsp;support&nbsp;to&nbsp;adjust&nbsp;the&nbsp;source's&nbsp;qps&nbsp;dynamically&nbsp;in&nbsp;our&nbsp;inner
&gt;&nbsp;flink.&nbsp;We&nbsp;can&nbsp;support&nbsp;this&nbsp;feature&nbsp;later.
&gt;
&gt;&nbsp;Best,
&gt;&nbsp;Jiangang&nbsp;Liu
&gt;
&gt;&nbsp;Zexian&nbsp;WU&nbsp;<[email protected]&gt;&nbsp;于2025年8月5日周二&nbsp;14:35写道:
&gt;
&gt;&nbsp;&gt;&nbsp;Hi&nbsp;Jiangang,
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Thank&nbsp;you&nbsp;very&nbsp;much&nbsp;for&nbsp;your&nbsp;valuable&nbsp;feedback&nbsp;and&nbsp;insightful&nbsp;questions.
&gt;&nbsp;&gt;&nbsp;The&nbsp;two&nbsp;scenarios&nbsp;you've&nbsp;distilled&nbsp;from&nbsp;real-world&nbsp;production&nbsp;experience
&gt;&nbsp;&gt;&nbsp;are&nbsp;extremely&nbsp;helpful&nbsp;for&nbsp;improving&nbsp;this&nbsp;feature.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;1.&nbsp;Regarding&nbsp;the&nbsp;load&nbsp;imbalance&nbsp;issue&nbsp;between&nbsp;source&nbsp;tasks:
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Your&nbsp;point&nbsp;about&nbsp;load&nbsp;imbalance&nbsp;between&nbsp;source&nbsp;tasks&nbsp;potentially&nbsp;leading
&gt;&nbsp;to
&gt;&nbsp;&gt;&nbsp;a&nbsp;suboptimal&nbsp;rate-limiting&nbsp;effect&nbsp;is&nbsp;absolutely&nbsp;correct.&nbsp;Ideally,&nbsp;a
&gt;&nbsp;&gt;&nbsp;mechanism&nbsp;that&nbsp;can&nbsp;dynamically&nbsp;allocate&nbsp;the&nbsp;rate&nbsp;based&nbsp;on&nbsp;the&nbsp;number&nbsp;of
&gt;&nbsp;&gt;&nbsp;splits&nbsp;assigned&nbsp;to&nbsp;each&nbsp;subtask&nbsp;would&nbsp;certainly&nbsp;be&nbsp;a&nbsp;superior&nbsp;solution.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;In&nbsp;fact,&nbsp;in&nbsp;the&nbsp;current&nbsp;design,&nbsp;we've&nbsp;introduced&nbsp;the&nbsp;notifyAddingSplit
&gt;&nbsp;&gt;&nbsp;interface,&nbsp;which&nbsp;provides&nbsp;the&nbsp;possibility&nbsp;for&nbsp;some&nbsp;level&nbsp;of&nbsp;local&nbsp;dynamic
&gt;&nbsp;&gt;&nbsp;adjustment.&nbsp;However,&nbsp;to&nbsp;implement&nbsp;a&nbsp;globally&nbsp;unified,&nbsp;on-demand&nbsp;dynamic
&gt;&nbsp;&gt;&nbsp;allocation,&nbsp;it&nbsp;would&nbsp;indeed&nbsp;require&nbsp;introducing&nbsp;a&nbsp;new&nbsp;communication
&gt;&nbsp;channel
&gt;&nbsp;&gt;&nbsp;between&nbsp;the&nbsp;SourceReader&nbsp;and&nbsp;the&nbsp;existing&nbsp;Coordinator,&nbsp;which&nbsp;would&nbsp;add
&gt;&nbsp;&gt;&nbsp;implementation&nbsp;complexity&nbsp;and&nbsp;potential&nbsp;performance&nbsp;overhead.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Furthermore,&nbsp;a&nbsp;more&nbsp;direct&nbsp;optimization&nbsp;is&nbsp;to&nbsp;have&nbsp;the&nbsp;Enumerator&nbsp;in&nbsp;the
&gt;&nbsp;&gt;&nbsp;Coordinator&nbsp;distribute&nbsp;splits&nbsp;as&nbsp;evenly&nbsp;as&nbsp;possible.&nbsp;I've&nbsp;noticed&nbsp;that
&gt;&nbsp;the
&gt;&nbsp;&gt;&nbsp;community&nbsp;is&nbsp;already&nbsp;working&nbsp;on&nbsp;this&nbsp;with&nbsp;the&nbsp;ongoing&nbsp;FLIP-537:
&gt;&nbsp;Enumerator
&gt;&nbsp;&gt;&nbsp;with&nbsp;Global&nbsp;Split&nbsp;Assignment&nbsp;Distribution&nbsp;for&nbsp;Balanced&nbsp;Split&nbsp;assignment.
&gt;&nbsp;&gt;&nbsp;Improving&nbsp;the&nbsp;split&nbsp;distribution&nbsp;logic&nbsp;seems&nbsp;like&nbsp;a&nbsp;simpler&nbsp;and&nbsp;more
&gt;&nbsp;direct
&gt;&nbsp;&gt;&nbsp;solution&nbsp;to&nbsp;the&nbsp;root&nbsp;problem&nbsp;than&nbsp;introducing&nbsp;a&nbsp;complex&nbsp;coordination
&gt;&nbsp;&gt;&nbsp;mechanism&nbsp;just&nbsp;for&nbsp;rate&nbsp;limiting.&nbsp;Therefore,&nbsp;at&nbsp;this&nbsp;stage,&nbsp;we&nbsp;have&nbsp;opted
&gt;&nbsp;&gt;&nbsp;for&nbsp;a&nbsp;simpler,&nbsp;self-contained&nbsp;approach&nbsp;as&nbsp;a&nbsp;robust&nbsp;first&nbsp;step.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;2.&nbsp;Regarding&nbsp;the&nbsp;generic,&nbsp;operator-level&nbsp;configuration:
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Regarding&nbsp;the&nbsp;generic&nbsp;configuration&nbsp;method&nbsp;you&nbsp;mentioned—using&nbsp;a
&gt;&nbsp;&gt;&nbsp;ConfigOption<Map&gt;&nbsp;to&nbsp;configure&nbsp;QPS&nbsp;for&nbsp;any&nbsp;operator—that&nbsp;is&nbsp;indeed&nbsp;a&nbsp;very
&gt;&nbsp;&gt;&nbsp;flexible&nbsp;and&nbsp;powerful&nbsp;design.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;I'd&nbsp;like&nbsp;to&nbsp;confirm&nbsp;with&nbsp;you:&nbsp;is&nbsp;the&nbsp;main&nbsp;purpose&nbsp;of&nbsp;your&nbsp;proposed
&gt;&nbsp;solution
&gt;&nbsp;&gt;&nbsp;also&nbsp;to&nbsp;enable&nbsp;dynamic&nbsp;rate&nbsp;limiting&nbsp;more&nbsp;conveniently&nbsp;(for&nbsp;example,
&gt;&nbsp;&gt;&nbsp;adjusting&nbsp;the&nbsp;rate&nbsp;at&nbsp;runtime&nbsp;via&nbsp;external&nbsp;signals)?&nbsp;This&nbsp;FLIP&nbsp;currently
&gt;&nbsp;&gt;&nbsp;focuses&nbsp;primarily&nbsp;on&nbsp;the&nbsp;source&nbsp;side,&nbsp;as&nbsp;it&nbsp;addresses&nbsp;the&nbsp;most&nbsp;immediate
&gt;&nbsp;&gt;&nbsp;pain&nbsp;point.&nbsp;Your&nbsp;idea&nbsp;is&nbsp;very&nbsp;insightful,&nbsp;and&nbsp;if&nbsp;our&nbsp;goals&nbsp;are&nbsp;aligned,
&gt;&nbsp;it
&gt;&nbsp;&gt;&nbsp;serves&nbsp;as&nbsp;an&nbsp;excellent&nbsp;reference&nbsp;for&nbsp;the&nbsp;future&nbsp;evolution&nbsp;of&nbsp;this
&gt;&nbsp;feature.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Thanks&nbsp;again&nbsp;for&nbsp;your&nbsp;insightful&nbsp;contributions;&nbsp;they&nbsp;are&nbsp;crucial&nbsp;for
&gt;&nbsp;&gt;&nbsp;refining&nbsp;and&nbsp;evolving&nbsp;this&nbsp;feature.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Best,
&gt;&nbsp;&gt;&nbsp;Zexian
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Jiangang&nbsp;Liu&nbsp;<[email protected]&gt;&nbsp;于2025年8月4日周一&nbsp;23:13写道:
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Hello,&nbsp;Zexian.&nbsp;This&nbsp;is&nbsp;a&nbsp;great&nbsp;work&nbsp;for&nbsp;job's&nbsp;stability&nbsp;when&nbsp;failover
&gt;&nbsp;or
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;catching&nbsp;the&nbsp;lag.&nbsp;I&nbsp;just&nbsp;have&nbsp;some&nbsp;questions:
&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;1.&nbsp;For&nbsp;some&nbsp;source&nbsp;tasks,&nbsp;their&nbsp;consuming&nbsp;splits&nbsp;may&nbsp;be&nbsp;more&nbsp;than
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;others.&nbsp;The&nbsp;simple&nbsp;calculation&nbsp;QPS&nbsp;based&nbsp;on&nbsp;parallelism&nbsp;may&nbsp;cause
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;imbalance.&nbsp;Can&nbsp;we&nbsp;support&nbsp;a&nbsp;calculation&nbsp;based&nbsp;on&nbsp;the&nbsp;split&nbsp;number
&gt;&nbsp;as&nbsp;a
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;common&nbsp;way?
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;2.&nbsp;In&nbsp;our&nbsp;company,&nbsp;we&nbsp;support&nbsp;to&nbsp;config&nbsp;each&nbsp;operator's&nbsp;qps&nbsp;by
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;ConfigOption<Map&gt;.&nbsp;This&nbsp;is&nbsp;convenient&nbsp;if&nbsp;we&nbsp;can&nbsp;get&nbsp;the&nbsp;operatorId
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;easily
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;before&nbsp;submitting.&nbsp;But&nbsp;in&nbsp;open&nbsp;source,&nbsp;it&nbsp;may&nbsp;be&nbsp;hard&nbsp;to&nbsp;do&nbsp;so.&nbsp;I
&gt;&nbsp;&gt;&nbsp;wonder
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;whether&nbsp;there&nbsp;exist&nbsp;similar&nbsp;way&nbsp;so&nbsp;that&nbsp;we&nbsp;do&nbsp;not&nbsp;need&nbsp;to&nbsp;implement
&gt;&nbsp;&gt;&nbsp;each
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;connector.
&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Best,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Jiangang&nbsp;Liu
&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Zexian&nbsp;WU&nbsp;<[email protected]&gt;&nbsp;于2025年8月4日周一&nbsp;14:35写道:
&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Hi&nbsp;Shengkai,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thank&nbsp;you&nbsp;for&nbsp;your&nbsp;thoughtful&nbsp;feedback&nbsp;and&nbsp;excellent&nbsp;questions.&nbsp;These
&gt;&nbsp;&gt;&nbsp;are
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;all&nbsp;great&nbsp;points.&nbsp;Here&nbsp;are&nbsp;my&nbsp;thoughts:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Regarding&nbsp;the&nbsp;requestSize&nbsp;parameter&nbsp;and&nbsp;its&nbsp;behavior:&nbsp;requestSize&nbsp;is
&gt;&nbsp;&gt;&nbsp;not
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;a
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;request&nbsp;for&nbsp;permission&nbsp;but&nbsp;a&nbsp;post-facto&nbsp;count&nbsp;of&nbsp;records&nbsp;that&nbsp;have
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;already
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;been&nbsp;emitted.&nbsp;If&nbsp;this&nbsp;count&nbsp;(e.g.,&nbsp;3)&nbsp;exceeds&nbsp;the&nbsp;available&nbsp;rate
&gt;&nbsp;credit
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;(e.g.,&nbsp;for&nbsp;2&nbsp;records),&nbsp;RateLimiter#acquire&nbsp;will&nbsp;return&nbsp;a
&gt;&nbsp;&gt;&nbsp;CompletionStage
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;that&nbsp;completes&nbsp;in&nbsp;the&nbsp;future.&nbsp;This&nbsp;creates&nbsp;a&nbsp;non-blocking&nbsp;pause&nbsp;in
&gt;&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;SourceReader,&nbsp;allowing&nbsp;it&nbsp;to&nbsp;pay&nbsp;back&nbsp;the&nbsp;"time&nbsp;debt"&nbsp;and&nbsp;ensuring
&gt;&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;average&nbsp;rate&nbsp;is&nbsp;maintained.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Regarding&nbsp;notifyCheckpointAborted:&nbsp;You&nbsp;are&nbsp;absolutely&nbsp;right;&nbsp;this&nbsp;is
&gt;&nbsp;a
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;critical&nbsp;point&nbsp;for&nbsp;preventing&nbsp;deadlocks.&nbsp;The&nbsp;specific&nbsp;plan&nbsp;is&nbsp;to&nbsp;add
&gt;&nbsp;a
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;notifyCheckpointAborted&nbsp;method&nbsp;to&nbsp;the&nbsp;RateLimiter&nbsp;interface.&nbsp;Then,
&gt;&nbsp;&gt;&nbsp;within
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;the&nbsp;SourceReader&nbsp;implementation,&nbsp;I&nbsp;will&nbsp;handle&nbsp;the&nbsp;checkpoint&nbsp;abort
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;callback&nbsp;and&nbsp;invoke&nbsp;this&nbsp;new&nbsp;method&nbsp;to&nbsp;reset&nbsp;the&nbsp;rate&nbsp;limiter's
&gt;&nbsp;&gt;&nbsp;internal
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;state.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Regarding&nbsp;configuration&nbsp;and&nbsp;unsupported&nbsp;connectors:&nbsp;You've&nbsp;made&nbsp;an
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;excellent&nbsp;point&nbsp;about&nbsp;this&nbsp;being&nbsp;a&nbsp;connector-specific&nbsp;feature.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;I&nbsp;agree&nbsp;that&nbsp;scan.rate-limit.record-per-second&nbsp;is&nbsp;a&nbsp;great&nbsp;name,&nbsp;and&nbsp;I
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;will
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;propose&nbsp;it&nbsp;as&nbsp;a&nbsp;best&nbsp;practice.&nbsp;I&nbsp;plan&nbsp;to&nbsp;use&nbsp;it&nbsp;in&nbsp;our&nbsp;initial
&gt;&nbsp;&gt;&nbsp;reference
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;implementation,&nbsp;likely&nbsp;for&nbsp;the&nbsp;Kafka&nbsp;connector,&nbsp;and&nbsp;it&nbsp;can&nbsp;serve&nbsp;as&nbsp;a
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;reference&nbsp;for&nbsp;other&nbsp;connectors&nbsp;in&nbsp;the&nbsp;future.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;You&nbsp;are&nbsp;correct&nbsp;that&nbsp;the&nbsp;decision&nbsp;to&nbsp;support&nbsp;this&nbsp;feature&nbsp;and&nbsp;how&nbsp;to
&gt;&nbsp;&gt;&nbsp;name
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;the&nbsp;option&nbsp;lies&nbsp;with&nbsp;each&nbsp;connector.&nbsp;The&nbsp;framework&nbsp;only&nbsp;provides&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;underlying&nbsp;mechanism&nbsp;(the&nbsp;RateLimiter&nbsp;integration).&nbsp;The&nbsp;mechanism&nbsp;for
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;providing&nbsp;clear&nbsp;error&nbsp;feedback&nbsp;works&nbsp;perfectly&nbsp;with&nbsp;this&nbsp;model:&nbsp;a
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;connector
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;that&nbsp;implements&nbsp;rate&nbsp;limiting&nbsp;will&nbsp;declare&nbsp;the&nbsp;option&nbsp;in&nbsp;its
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;DynamicTableFactory.&nbsp;Connectors&nbsp;that&nbsp;don't&nbsp;support&nbsp;it&nbsp;will&nbsp;not
&gt;&nbsp;declare
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;option.&nbsp;Consequently,&nbsp;Flink’s&nbsp;built-in&nbsp;validation&nbsp;will&nbsp;automatically
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;throw
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;a&nbsp;ValidationException&nbsp;for&nbsp;an&nbsp;unknown&nbsp;option,&nbsp;providing&nbsp;the&nbsp;desired
&gt;&nbsp;user
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;feedback.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Best,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Zexian
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Shengkai&nbsp;Fang&nbsp;<[email protected]&gt;&nbsp;于2025年7月28日周一&nbsp;14:57写道:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Hi,&nbsp;Zexian.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks&nbsp;for&nbsp;your&nbsp;FLIP.&nbsp;I&nbsp;think&nbsp;rate&nbsp;litmit&nbsp;is&nbsp;very&nbsp;important&nbsp;feature
&gt;&nbsp;&gt;&nbsp;for
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;our
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;users.&nbsp;But&nbsp;I&nbsp;have&nbsp;some&nbsp;questions&nbsp;about&nbsp;the&nbsp;FLIP:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;1.&nbsp;How&nbsp;do&nbsp;I&nbsp;determine&nbsp;the&nbsp;input&nbsp;parameter&nbsp;`requestSize`&nbsp;for
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;`RateLimiter#acquire`?&nbsp;If&nbsp;the&nbsp;rate&nbsp;limiter&nbsp;indicates&nbsp;there&nbsp;are&nbsp;2
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;remaining
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;requests&nbsp;that&nbsp;can&nbsp;be&nbsp;emitted,&nbsp;but&nbsp;the&nbsp;`requestSize`&nbsp;is&nbsp;3,&nbsp;what&nbsp;is
&gt;&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;behavior&nbsp;here?
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;2.&nbsp;CheckpointListener&nbsp;also&nbsp;has&nbsp;a&nbsp;method&nbsp;named
&gt;&nbsp;&gt;&nbsp;notifyCheckpointAborted,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;I
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;think&nbsp;RateLimiter&nbsp;also&nbsp;needs&nbsp;this.&nbsp;If&nbsp;the&nbsp;checkpoint&nbsp;aborts,&nbsp;please
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;clear
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;the&nbsp;status&nbsp;of&nbsp;the&nbsp;rate&nbsp;limiter.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;3.&nbsp;I&nbsp;think&nbsp;`scan.rate-limit.record-per-second`&nbsp;is&nbsp;better&nbsp;than
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;`scan.rate.limit.record-per-second`.&nbsp;It&nbsp;seems&nbsp;only&nbsp;FLIP-27&nbsp;source
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;supports
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;rate-limit,&nbsp;it's&nbsp;better&nbsp;we&nbsp;can&nbsp;throw&nbsp;an&nbsp;exception&nbsp;to&nbsp;notify&nbsp;users
&gt;&nbsp;if
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;source&nbsp;doesn't&nbsp;support&nbsp;this&nbsp;feature.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Best,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Shengkai
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Zexian&nbsp;WU&nbsp;<[email protected]&gt;&nbsp;于2025年7月28日周一&nbsp;11:52写道:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Hi&nbsp;Leonard,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks&nbsp;a&nbsp;lot&nbsp;for&nbsp;your&nbsp;support&nbsp;and&nbsp;positive&nbsp;feedback!&nbsp;I'm&nbsp;glad&nbsp;to
&gt;&nbsp;&gt;&nbsp;hear
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;you
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;think&nbsp;the&nbsp;design&nbsp;meets&nbsp;the&nbsp;needs&nbsp;of&nbsp;most&nbsp;scenarios.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Indeed,&nbsp;rate&nbsp;limiting&nbsp;is&nbsp;a&nbsp;fundamental&nbsp;and&nbsp;important&nbsp;feature,&nbsp;and
&gt;&nbsp;&gt;&nbsp;I'm
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;excited&nbsp;to&nbsp;help&nbsp;fill&nbsp;this&nbsp;gap.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;I'm&nbsp;also&nbsp;looking&nbsp;forward&nbsp;to&nbsp;more&nbsp;ideas&nbsp;and&nbsp;potential&nbsp;improvements
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;from
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;other&nbsp;community&nbsp;members.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks&nbsp;again!
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Best,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Zexian
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;On&nbsp;2025/07/24&nbsp;12:19:39&nbsp;Leonard&nbsp;Xu&nbsp;wrote:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks&nbsp;Zexian&nbsp;for&nbsp;driving&nbsp;this&nbsp;work.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Rate&nbsp;limiting&nbsp;is&nbsp;a&nbsp;common&nbsp;requirement,&nbsp;TBH,&nbsp;we&nbsp;should&nbsp;have
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;supported
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;it
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;in&nbsp;earlier&nbsp;stage,&nbsp;and&nbsp;the&nbsp;proposed&nbsp;design&nbsp;integrating&nbsp;it&nbsp;into&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;source
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;operator&nbsp;lifecycle,&nbsp;it&nbsp;is&nbsp;already&nbsp;able&nbsp;to&nbsp;meet&nbsp;the&nbsp;vast&nbsp;majority
&gt;&nbsp;of
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;scenarios,&nbsp;looks&nbsp;good&nbsp;from&nbsp;my&nbsp;side.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Best,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Leonard
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;2025&nbsp;7月&nbsp;18&nbsp;12:01,Zexian&nbsp;WU&nbsp;<[email protected]&gt;&nbsp;写道:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Hi&nbsp;everyone,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;I&nbsp;would&nbsp;like&nbsp;to&nbsp;start&nbsp;a&nbsp;discussion&nbsp;on&nbsp;a&nbsp;new&nbsp;Flink&nbsp;Improvement
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Proposal
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;(FLIP),&nbsp;FLIP-535:&nbsp;Introduce&nbsp;RateLimiter&nbsp;to&nbsp;Source.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;The&nbsp;full&nbsp;proposal&nbsp;can&nbsp;be&nbsp;found&nbsp;on&nbsp;the&nbsp;wiki:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Motivation
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;In&nbsp;many&nbsp;production&nbsp;environments,&nbsp;Flink&nbsp;sources&nbsp;read&nbsp;from
&gt;&nbsp;shared
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;external&nbsp;systems&nbsp;(like&nbsp;Kafka,&nbsp;Pulsar,&nbsp;or&nbsp;databases)&nbsp;where
&gt;&nbsp;resources
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;are
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;limited.&nbsp;An&nbsp;uncontrolled&nbsp;data&nbsp;ingestion&nbsp;rate&nbsp;can&nbsp;lead&nbsp;to&nbsp;critical
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;issues:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Resource&nbsp;Contention:&nbsp;A&nbsp;high-throughput&nbsp;Flink&nbsp;job&nbsp;can
&gt;&nbsp;saturate&nbsp;a
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;shared
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Kafka&nbsp;cluster's&nbsp;bandwidth,&nbsp;starving&nbsp;other&nbsp;essential&nbsp;services.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;System&nbsp;Instability:&nbsp;Aggressive&nbsp;polling&nbsp;from&nbsp;a&nbsp;database&nbsp;(e.g.,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;MySQL)
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;can&nbsp;overwhelm&nbsp;its&nbsp;IOPS,&nbsp;degrading&nbsp;performance&nbsp;for&nbsp;transactional
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;queries.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;This&nbsp;proposal&nbsp;aims&nbsp;to&nbsp;provide&nbsp;a&nbsp;built-in,&nbsp;precise,&nbsp;and
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;easy-to-use
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;rate-limiting&nbsp;mechanism&nbsp;for&nbsp;Flink&nbsp;Sources&nbsp;to&nbsp;ensure&nbsp;system
&gt;&nbsp;&gt;&nbsp;stability
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;and
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;fair&nbsp;resource&nbsp;sharing.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Proposed&nbsp;Solution
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;The&nbsp;core&nbsp;idea&nbsp;is&nbsp;to&nbsp;integrate&nbsp;a&nbsp;flexible,&nbsp;record-based
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;rate-limiting
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;mechanism&nbsp;directly&nbsp;into&nbsp;the&nbsp;SourceReaderBase,&nbsp;making&nbsp;it&nbsp;available
&gt;&nbsp;&gt;&nbsp;to
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;all
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;connectors&nbsp;built&nbsp;on&nbsp;the&nbsp;new&nbsp;source&nbsp;interface.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Key&nbsp;Changes:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Seamless&nbsp;Integration&nbsp;via&nbsp;SourceReaderBase:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;New&nbsp;constructors&nbsp;accepting&nbsp;a&nbsp;RateLimiterStrategy&nbsp;will&nbsp;be
&gt;&nbsp;added
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;directly&nbsp;to&nbsp;SourceReaderBase.&nbsp;This&nbsp;allows&nbsp;any&nbsp;connector&nbsp;built&nbsp;on
&gt;&nbsp;&gt;&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;modern
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;source&nbsp;interface&nbsp;(like&nbsp;Kafka&nbsp;or&nbsp;Pulsar)&nbsp;to&nbsp;enable&nbsp;rate&nbsp;limiting
&gt;&nbsp;&gt;&nbsp;with
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;minimal&nbsp;code&nbsp;changes.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Accurate,&nbsp;Post-Emission&nbsp;Throttling:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;To&nbsp;support&nbsp;sources&nbsp;with&nbsp;unpredictable&nbsp;batch&nbsp;sizes&nbsp;(e.g.,
&gt;&nbsp;&gt;&nbsp;Kafka),
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;rate
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;limiting&nbsp;is&nbsp;applied&nbsp;after&nbsp;records&nbsp;are&nbsp;emitted.&nbsp;The&nbsp;reader&nbsp;counts
&gt;&nbsp;&gt;&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;records&nbsp;after&nbsp;each&nbsp;recordEmitter.emitRecord&nbsp;method&nbsp;call,&nbsp;counts
&gt;&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;records,&nbsp;and&nbsp;only&nbsp;then&nbsp;consults&nbsp;the&nbsp;RateLimiter.&nbsp;This&nbsp;ensures
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;throttling
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;is
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;based&nbsp;on&nbsp;the&nbsp;precise&nbsp;number&nbsp;of&nbsp;records&nbsp;processed.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Fully&nbsp;Non-Blocking&nbsp;Operation:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;The&nbsp;entire&nbsp;mechanism&nbsp;is&nbsp;asynchronous&nbsp;and&nbsp;non-blocking.&nbsp;If&nbsp;a
&gt;&nbsp;&gt;&nbsp;rate
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;limit
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;is&nbsp;hit,&nbsp;the&nbsp;reader&nbsp;pauses&nbsp;by&nbsp;returning
&gt;&nbsp;InputStatus.MORE_AVAILABLE.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;This
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;yields&nbsp;control&nbsp;to&nbsp;the&nbsp;Flink&nbsp;task's&nbsp;event&nbsp;loop&nbsp;without&nbsp;blocking
&gt;&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;processing&nbsp;thread,&nbsp;ensuring&nbsp;that&nbsp;critical&nbsp;operations&nbsp;like
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;checkpointing
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;are
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;never&nbsp;delayed.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Unified&nbsp;Configuration&nbsp;via&nbsp;SQL/Table&nbsp;API:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Users&nbsp;can&nbsp;configure&nbsp;rate&nbsp;limits&nbsp;consistently&nbsp;across&nbsp;different
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;sources
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;using&nbsp;a&nbsp;simple&nbsp;SQL&nbsp;table&nbsp;option,&nbsp;such&nbsp;as&nbsp;WITH&nbsp;('scan.rate.limit'
&gt;&nbsp;=
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;'1000').
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;This&nbsp;provides&nbsp;a&nbsp;unified&nbsp;and&nbsp;user-friendly&nbsp;experience&nbsp;for&nbsp;pipeline
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;tuning.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;The&nbsp;design&nbsp;is&nbsp;backward-compatible,&nbsp;and&nbsp;existing&nbsp;custom
&gt;&nbsp;sources
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;will
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;continue&nbsp;to&nbsp;work&nbsp;without&nbsp;any&nbsp;modification.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;I&nbsp;believe&nbsp;this&nbsp;feature&nbsp;will&nbsp;be&nbsp;a&nbsp;valuable&nbsp;addition&nbsp;for&nbsp;Flink
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;users
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;operating&nbsp;in&nbsp;complex,&nbsp;multi-tenant&nbsp;environments.&nbsp;I'm&nbsp;looking
&gt;&nbsp;&gt;&nbsp;forward
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;to
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;your&nbsp;feedback,&nbsp;suggestions,&nbsp;and&nbsp;any&nbsp;potential&nbsp;concerns&nbsp;you&nbsp;might
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;have.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Zexian&nbsp;Wu
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;

Reply via email to