First of all my sincere apologies for slow progress on this one. @Junrui Lee <jrlee....@gmail.com> and Xia,
I updated the PR with respect to the last set of feedback comments on the PR. Please take a look. I am hoping to finish this one quickly. Regards Venkata krishnan On Thu, May 9, 2024 at 4:35 AM Venkatakrishnan Sowrirajan <vsowr...@asu.edu> wrote: > Xia, > > Thanks for the reviews. Unfortunately due to work commitments I am little > delayed in addressing your review comments. Mostly will be done by end of > this week. Just a quick heads up. > > Jinrui, > > Thanks, that would be great. > > On Mon, May 6, 2024, 12:45 AM Junrui Lee <jrlee....@gmail.com> wrote: > >> Hi, >> Thanks for the reminder. I will review it soon during my free time. >> >> Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年5月4日周六 10:10写道: >> >> > Jinrui and Xia >> > >> > Gentle ping for reviews. >> > >> > On Mon, Apr 29, 2024, 8:28 PM Venkatakrishnan Sowrirajan < >> vsowr...@asu.edu >> > > >> > wrote: >> > >> > > Hi Xia and Jinrui, >> > > >> > > Filed >> https://urldefense.com/v3/__https://github.com/apache/flink/pull/24736__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbictUrQLQ$ >> to address the above >> > > described issue. Please take a look whenever you can. >> > > >> > > Thanks >> > > Venkat >> > > >> > > >> > > On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan < >> > > vsowr...@asu.edu> wrote: >> > > >> > >> Filed >> https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLINK-35165__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbim4QLkBQ$ >> to address the >> > >> above described issue. Will share the PR here once it is ready for >> > review. >> > >> >> > >> Regards >> > >> Venkata krishnan >> > >> >> > >> >> > >> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee <jrlee....@gmail.com> >> wrote: >> > >> >> > >>> Thanks Venkata and Xia for providing further clarification. I think >> > your >> > >>> example illustrates the significance of this proposal very well. >> Please >> > >>> feel free go ahead and address the concerns. >> > >>> >> > >>> Best, >> > >>> Junrui >> > >>> >> > >>> Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月16日周二 >> 07:01写道: >> > >>> >> > >>> > Thanks for adding your thoughts to this discussion. >> > >>> > >> > >>> > If we all agree that the source vertex parallelism shouldn't be >> bound >> > >>> by >> > >>> > the downstream max parallelism >> > >>> > (jobmanager.adaptive-batch-scheduler.max-parallelism) >> > >>> > based on the rationale and the issues described above, I can take >> a >> > >>> stab at >> > >>> > addressing the issue. >> > >>> > >> > >>> > Let me file a ticket to track this issue. Otherwise, I'm looking >> > >>> forward to >> > >>> > hearing more thoughts from others as well, especially Lijie and >> > Junrui >> > >>> who >> > >>> > have more context on the AdaptiveBatchScheduler. >> > >>> > >> > >>> > Regards >> > >>> > Venkata krishnan >> > >>> > >> > >>> > >> > >>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun <xingbe...@gmail.com> >> > wrote: >> > >>> > >> > >>> > > Hi Venkat, >> > >>> > > I agree that the parallelism of source vertex should not be >> upper >> > >>> bounded >> > >>> > > by the job's global max parallelism. The case you mentioned, >> >> > High >> > >>> > filter >> > >>> > > selectivity with huge amounts of data to read excellently >> supports >> > >>> this >> > >>> > > viewpoint. (In fact, in the current implementation, if the >> source >> > >>> > > parallelism is pre-specified at job create stage, rather than >> > >>> relying on >> > >>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, >> > the >> > >>> > source >> > >>> > > vertex's parallelism can indeed exceed the job's global max >> > >>> parallelism.) >> > >>> > > >> > >>> > > As Lijie and Junrui pointed out, the key issue is "semantic >> > >>> consistency." >> > >>> > > Currently, if a vertex has not set maxParallelism, the >> > >>> > > AdaptiveBatchScheduler will use >> > >>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as >> the >> > >>> > vertex's >> > >>> > > maxParallelism. Since the current implementation does not >> > distinguish >> > >>> > > between source vertices and downstream vertices, source vertices >> > are >> > >>> also >> > >>> > > subject to this limitation. >> > >>> > > >> > >>> > > Therefore, I believe that if the issue of "semantic consistency" >> > can >> > >>> be >> > >>> > > well explained in the code and configuration documentation, the >> > >>> > > AdaptiveBatchScheduler should support that the parallelism of >> > source >> > >>> > > vertices can exceed the job's global max parallelism. >> > >>> > > >> > >>> > > Best, >> > >>> > > Xia >> > >>> > > >> > >>> > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月14日周日 >> > 10:31写道: >> > >>> > > >> > >>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche* >> > >>> > > > *duler.default-source-parallelism*" should not be bound by >> the " >> > >>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". >> > >>> > > > >> > >>> > > > - Source vertex is unique and does not have any upstream >> > >>> vertices >> > >>> > > > - Downstream vertices read shuffled data partitioned by >> key, >> > >>> which >> > >>> > is >> > >>> > > > not the case for the Source vertex >> > >>> > > > - Limiting source parallelism by downstream vertices' max >> > >>> > parallelism >> > >>> > > is >> > >>> > > > incorrect >> > >>> > > > >> > >>> > > > If we say for ""semantic consistency" the source vertex >> > >>> parallelism has >> > >>> > > to >> > >>> > > > be bound by the overall job's max parallelism, it can lead to >> > >>> following >> > >>> > > > issues: >> > >>> > > > >> > >>> > > > - High filter selectivity with huge amounts of data to >> read - >> > >>> > setting >> > >>> > > > high >> "*jobmanager.adaptive-batch-scheduler.max-parallelism*" >> > so >> > >>> that >> > >>> > > > source parallelism can be set higher can lead to small >> blocks >> > >>> and >> > >>> > > > sub-optimal performance. >> > >>> > > > - Setting high >> > >>> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*" >> > >>> > > > requires careful tuning of network buffer configurations >> which >> > >>> is >> > >>> > > > unnecessary in cases where it is not required just so that >> the >> > >>> > source >> > >>> > > > parallelism can be set high. >> > >>> > > > >> > >>> > > > Regards >> > >>> > > > Venkata krishnan >> > >>> > > > >> > >>> > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee < >> jrlee....@gmail.com> >> > >>> > wrote: >> > >>> > > > >> > >>> > > > > Hello Venkata krishnan, >> > >>> > > > > >> > >>> > > > > I think the term "semantic inconsistency" defined by >> > >>> > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers >> to >> > >>> > > > maintaining a >> > >>> > > > > uniform upper limit on parallelism across all vertices >> within a >> > >>> job. >> > >>> > As >> > >>> > > > the >> > >>> > > > > source vertices are part of the global execution graph, they >> > >>> should >> > >>> > > also >> > >>> > > > > respect this rule to ensure consistent application of >> > parallelism >> > >>> > > > > constraints. >> > >>> > > > > >> > >>> > > > > Best, >> > >>> > > > > Junrui >> > >>> > > > > >> > >>> > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五 >> > >>> 02:10写道: >> > >>> > > > > >> > >>> > > > > > Gentle bump on this question. cc @Becket Qin < >> > >>> becket....@gmail.com >> > >>> > > >> > >>> > > as >> > >>> > > > > > well. >> > >>> > > > > > >> > >>> > > > > > Regards >> > >>> > > > > > Venkata krishnan >> > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan >> Sowrirajan < >> > >>> > > > > > vsowr...@asu.edu> wrote: >> > >>> > > > > > >> > >>> > > > > > > Thanks for the response Lijie and Junrui. Sorry for the >> > late >> > >>> > reply. >> > >>> > > > Few >> > >>> > > > > > > follow up questions. >> > >>> > > > > > > >> > >>> > > > > > > > Source can actually ignore this limit >> > >>> > > > > > > because it has no upstream, but this will lead to >> semantic >> > >>> > > > > inconsistency. >> > >>> > > > > > > >> > >>> > > > > > > Lijie, can you please elaborate on the above comment >> > further? >> > >>> > What >> > >>> > > do >> > >>> > > > > you >> > >>> > > > > > > mean when you say it will lead to "semantic >> inconsistency"? >> > >>> > > > > > > >> > >>> > > > > > > > Secondly, we first need to limit the max parallelism >> of >> > >>> > > > (downstream) >> > >>> > > > > > > vertex, and then we can decide how many subpartitions >> > >>> (upstream >> > >>> > > > vertex) >> > >>> > > > > > > should produce. The limit should be effective, otherwise >> > some >> > >>> > > > > downstream >> > >>> > > > > > > tasks will have no data to process. >> > >>> > > > > > > >> > >>> > > > > > > This makes sense in the context of any other vertices >> other >> > >>> than >> > >>> > > the >> > >>> > > > > > > source vertex. As you mentioned above ("Source can >> actually >> > >>> > ignore >> > >>> > > > this >> > >>> > > > > > > limit because it has no upstream"), therefore I feel " >> > >>> > > > > > > >> > >>> jobmanager.adaptive-batch-scheduler.default-source-parallelism" >> > >>> > > need >> > >>> > > > > not >> > >>> > > > > > > be upper bounded by >> > >>> > > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism". >> > >>> > > > > > > >> > >>> > > > > > > Regards >> > >>> > > > > > > Venkata krishnan >> > >>> > > > > > > >> > >>> > > > > > > >> > >>> > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee < >> > >>> jrlee....@gmail.com> >> > >>> > > > > wrote: >> > >>> > > > > > > >> > >>> > > > > > >> Hi Venkat, >> > >>> > > > > > >> >> > >>> > > > > > >> As Lijie mentioned, in Flink, the parallelism is >> required >> > >>> to be >> > >>> > > > less >> > >>> > > > > > than >> > >>> > > > > > >> or equal to the maximum parallelism. The config option >> > >>> > > > > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and >> > >>> > > > > > >> >> > >>> jobmanager.adaptive-batch-scheduler.default-source-parallelism >> > >>> > > will >> > >>> > > > be >> > >>> > > > > > set >> > >>> > > > > > >> as the source's parallelism and max-parallelism, >> > >>> respectively. >> > >>> > > > > > Therefore, >> > >>> > > > > > >> the check failed situation you encountered is in line >> with >> > >>> the >> > >>> > > > > > >> expectations. >> > >>> > > > > > >> >> > >>> > > > > > >> Best, >> > >>> > > > > > >> Junrui >> > >>> > > > > > >> >> > >>> > > > > > >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 >> > >>> 17:35写道: >> > >>> > > > > > >> >> > >>> > > > > > >> > Hi Venkat, >> > >>> > > > > > >> > >> > >>> > > > > > >> > >> default-source-parallelism config should be >> > independent >> > >>> > from >> > >>> > > > the >> > >>> > > > > > >> > max-parallelism >> > >>> > > > > > >> > >> > >>> > > > > > >> > Actually, it's not. >> > >>> > > > > > >> > >> > >>> > > > > > >> > Firstly, it's obvious that the parallelism should be >> > less >> > >>> than >> > >>> > > or >> > >>> > > > > > equal >> > >>> > > > > > >> to >> > >>> > > > > > >> > the max parallelism(both literally and execution). >> The >> > >>> > > > > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" >> > >>> will be >> > >>> > > used >> > >>> > > > > as >> > >>> > > > > > >> the >> > >>> > > > > > >> > max parallelism for a vertex if you don't set max >> > >>> parallelism >> > >>> > > for >> > >>> > > > it >> > >>> > > > > > >> > individually (Just like the source in your case). >> > >>> > > > > > >> > >> > >>> > > > > > >> > Secondly, we first need to limit the max parallelism >> of >> > >>> > > > (downstream) >> > >>> > > > > > >> > vertex, and then we can decide how many subpartitions >> > >>> > (upstream >> > >>> > > > > > vertex) >> > >>> > > > > > >> > should produce. The limit should be effective, >> otherwise >> > >>> some >> > >>> > > > > > downstream >> > >>> > > > > > >> > tasks will have no data to process. Source can >> actually >> > >>> ignore >> > >>> > > > this >> > >>> > > > > > >> limit >> > >>> > > > > > >> > because it has no upstream, but this will lead to >> > semantic >> > >>> > > > > > >> inconsistency. >> > >>> > > > > > >> > >> > >>> > > > > > >> > Best, >> > >>> > > > > > >> > Lijie >> > >>> > > > > > >> > >> > >>> > > > > > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> >> > >>> 于2024年2月29日周四 >> > >>> > > > > 05:49写道: >> > >>> > > > > > >> > >> > >>> > > > > > >> > > Hi Flink devs, >> > >>> > > > > > >> > > >> > >>> > > > > > >> > > With Flink's AdaptiveBatchScheduler >> > >>> > > > > > >> > > < >> > >>> > > > > > >> > > >> > >>> > > > > > >> > >> > >>> > > > > > >> >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > >>> > > >> > >>> > >> > >>> >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$ >> > >>> > > > > > >> > > > >> > >>> > > > > > >> > > (Note: >> > >>> > > > > > >> > > this is different from AdaptiveScheduler >> > >>> > > > > > >> > > < >> > >>> > > > > > >> > > >> > >>> > > > > > >> > >> > >>> > > > > > >> >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > >>> > > >> > >>> > >> > >>> >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$ >> > >>> > > > > > >> > > >), >> > >>> > > > > > >> > > the scheduler automatically determines the correct >> > >>> number of >> > >>> > > > > > >> downstream >> > >>> > > > > > >> > > tasks required to process the shuffle generated by >> the >> > >>> > > upstream >> > >>> > > > > > >> vertex. >> > >>> > > > > > >> > > >> > >>> > > > > > >> > > I have a question regarding the current behavior. >> > There >> > >>> are >> > >>> > 2 >> > >>> > > > > > configs >> > >>> > > > > > >> > which >> > >>> > > > > > >> > > are in interplay here. >> > >>> > > > > > >> > > 1. >> > >>> > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism >> > >>> > > > > > >> > > < >> > >>> > > > > > >> > > >> > >>> > > > > > >> > >> > >>> > > > > > >> >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > >>> > > >> > >>> > >> > >>> >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ >> > >>> > > > > > >> > > > >> > >>> > > > > > >> > > - The default parallelism of data source. >> > >>> > > > > > >> > > 2. >> jobmanager.adaptive-batch-scheduler.max-parallelism >> > >>> > > > > > >> > > < >> > >>> > > > > > >> > > >> > >>> > > > > > >> > >> > >>> > > > > > >> >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > >>> > > >> > >>> > >> > >>> >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ >> > >>> > > > > > >> > > > >> > >>> > > > > > >> > > - >> > >>> > > > > > >> > > Upper bound of allowed parallelism to set >> adaptively. >> > >>> > > > > > >> > > >> > >>> > > > > > >> > > Currently, if " >> > >>> > > > > > >> > > >> > >>> > jobmanager.adaptive-batch-scheduler.default-source-parallelism >> > >>> > > > > > >> > > < >> > >>> > > > > > >> > > >> > >>> > > > > > >> > >> > >>> > > > > > >> >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > >>> > > >> > >>> > >> > >>> >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ >> > >>> > > > > > >> > > >" >> > >>> > > > > > >> > > is greater than >> > >>> > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism >> > >>> > > > > > >> > > < >> > >>> > > > > > >> > > >> > >>> > > > > > >> > >> > >>> > > > > > >> >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > >>> > > >> > >>> > >> > >>> >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ >> > >>> > > > > > >> > > >", >> > >>> > > > > > >> > > Flink application fails with the below message: >> > >>> > > > > > >> > > >> > >>> > > > > > >> > > "Vertex's parallelism should be smaller than or >> equal >> > to >> > >>> > > > vertex's >> > >>> > > > > > max >> > >>> > > > > > >> > > parallelism." >> > >>> > > > > > >> > > >> > >>> > > > > > >> > > This is the corresponding code in Flink's >> > >>> > > > > > DefaultVertexParallelismInfo >> > >>> > > > > > >> > > < >> > >>> > > > > > >> > > >> > >>> > > > > > >> > >> > >>> > > > > > >> >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > >>> > > >> > >>> > >> > >>> >> > >> https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$ >> > >>> > > > > > >> > > >. >> > >>> > > > > > >> > > My question is, "default-source-parallelism" config >> > >>> should >> > >>> > be >> > >>> > > > > > >> independent >> > >>> > > > > > >> > > from the "max-parallelism" flag. The former >> controls >> > the >> > >>> > > default >> > >>> > > > > > >> source >> > >>> > > > > > >> > > parallelism while the latter controls the max >> number >> > of >> > >>> > > > partitions >> > >>> > > > > > to >> > >>> > > > > > >> > write >> > >>> > > > > > >> > > the intermediate shuffle. >> > >>> > > > > > >> > > >> > >>> > > > > > >> > > If this is true, then the above check should be >> fixed. >> > >>> > > > Otherwise, >> > >>> > > > > > >> wanted >> > >>> > > > > > >> > to >> > >>> > > > > > >> > > understand why the "default-source-parallelism` >> should >> > >>> be >> > >>> > less >> > >>> > > > > than >> > >>> > > > > > >> the >> > >>> > > > > > >> > > "max-parallelism" >> > >>> > > > > > >> > > >> > >>> > > > > > >> > > Thanks >> > >>> > > > > > >> > > Venkat >> > >>> > > > > > >> > > >> > >>> > > > > > >> > >> > >>> > > > > > >> >> > >>> > > > > > > >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > >>> > > >> > >>> > >> > >>> >> > >> >> > >> >