Xia, Thanks for the reviews. Unfortunately due to work commitments I am little delayed in addressing your review comments. Mostly will be done by end of this week. Just a quick heads up.
Jinrui, Thanks, that would be great. On Mon, May 6, 2024, 12:45 AM Junrui Lee <jrlee....@gmail.com> wrote: > Hi, > Thanks for the reminder. I will review it soon during my free time. > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年5月4日周六 10:10写道: > > > Jinrui and Xia > > > > Gentle ping for reviews. > > > > On Mon, Apr 29, 2024, 8:28 PM Venkatakrishnan Sowrirajan < > vsowr...@asu.edu > > > > > wrote: > > > > > Hi Xia and Jinrui, > > > > > > Filed > https://urldefense.com/v3/__https://github.com/apache/flink/pull/24736__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbictUrQLQ$ > to address the above > > > described issue. Please take a look whenever you can. > > > > > > Thanks > > > Venkat > > > > > > > > > On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan < > > > vsowr...@asu.edu> wrote: > > > > > >> Filed > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLINK-35165__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbim4QLkBQ$ > to address the > > >> above described issue. Will share the PR here once it is ready for > > review. > > >> > > >> Regards > > >> Venkata krishnan > > >> > > >> > > >> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee <jrlee....@gmail.com> > wrote: > > >> > > >>> Thanks Venkata and Xia for providing further clarification. I think > > your > > >>> example illustrates the significance of this proposal very well. > Please > > >>> feel free go ahead and address the concerns. > > >>> > > >>> Best, > > >>> Junrui > > >>> > > >>> Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月16日周二 07:01写道: > > >>> > > >>> > Thanks for adding your thoughts to this discussion. > > >>> > > > >>> > If we all agree that the source vertex parallelism shouldn't be > bound > > >>> by > > >>> > the downstream max parallelism > > >>> > (jobmanager.adaptive-batch-scheduler.max-parallelism) > > >>> > based on the rationale and the issues described above, I can take a > > >>> stab at > > >>> > addressing the issue. > > >>> > > > >>> > Let me file a ticket to track this issue. Otherwise, I'm looking > > >>> forward to > > >>> > hearing more thoughts from others as well, especially Lijie and > > Junrui > > >>> who > > >>> > have more context on the AdaptiveBatchScheduler. > > >>> > > > >>> > Regards > > >>> > Venkata krishnan > > >>> > > > >>> > > > >>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun <xingbe...@gmail.com> > > wrote: > > >>> > > > >>> > > Hi Venkat, > > >>> > > I agree that the parallelism of source vertex should not be upper > > >>> bounded > > >>> > > by the job's global max parallelism. The case you mentioned, >> > > High > > >>> > filter > > >>> > > selectivity with huge amounts of data to read excellently > supports > > >>> this > > >>> > > viewpoint. (In fact, in the current implementation, if the source > > >>> > > parallelism is pre-specified at job create stage, rather than > > >>> relying on > > >>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, > > the > > >>> > source > > >>> > > vertex's parallelism can indeed exceed the job's global max > > >>> parallelism.) > > >>> > > > > >>> > > As Lijie and Junrui pointed out, the key issue is "semantic > > >>> consistency." > > >>> > > Currently, if a vertex has not set maxParallelism, the > > >>> > > AdaptiveBatchScheduler will use > > >>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as > the > > >>> > vertex's > > >>> > > maxParallelism. Since the current implementation does not > > distinguish > > >>> > > between source vertices and downstream vertices, source vertices > > are > > >>> also > > >>> > > subject to this limitation. > > >>> > > > > >>> > > Therefore, I believe that if the issue of "semantic consistency" > > can > > >>> be > > >>> > > well explained in the code and configuration documentation, the > > >>> > > AdaptiveBatchScheduler should support that the parallelism of > > source > > >>> > > vertices can exceed the job's global max parallelism. > > >>> > > > > >>> > > Best, > > >>> > > Xia > > >>> > > > > >>> > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月14日周日 > > 10:31写道: > > >>> > > > > >>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche* > > >>> > > > *duler.default-source-parallelism*" should not be bound by the > " > > >>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". > > >>> > > > > > >>> > > > - Source vertex is unique and does not have any upstream > > >>> vertices > > >>> > > > - Downstream vertices read shuffled data partitioned by key, > > >>> which > > >>> > is > > >>> > > > not the case for the Source vertex > > >>> > > > - Limiting source parallelism by downstream vertices' max > > >>> > parallelism > > >>> > > is > > >>> > > > incorrect > > >>> > > > > > >>> > > > If we say for ""semantic consistency" the source vertex > > >>> parallelism has > > >>> > > to > > >>> > > > be bound by the overall job's max parallelism, it can lead to > > >>> following > > >>> > > > issues: > > >>> > > > > > >>> > > > - High filter selectivity with huge amounts of data to read > - > > >>> > setting > > >>> > > > high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" > > so > > >>> that > > >>> > > > source parallelism can be set higher can lead to small > blocks > > >>> and > > >>> > > > sub-optimal performance. > > >>> > > > - Setting high > > >>> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*" > > >>> > > > requires careful tuning of network buffer configurations > which > > >>> is > > >>> > > > unnecessary in cases where it is not required just so that > the > > >>> > source > > >>> > > > parallelism can be set high. > > >>> > > > > > >>> > > > Regards > > >>> > > > Venkata krishnan > > >>> > > > > > >>> > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee < > jrlee....@gmail.com> > > >>> > wrote: > > >>> > > > > > >>> > > > > Hello Venkata krishnan, > > >>> > > > > > > >>> > > > > I think the term "semantic inconsistency" defined by > > >>> > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to > > >>> > > > maintaining a > > >>> > > > > uniform upper limit on parallelism across all vertices > within a > > >>> job. > > >>> > As > > >>> > > > the > > >>> > > > > source vertices are part of the global execution graph, they > > >>> should > > >>> > > also > > >>> > > > > respect this rule to ensure consistent application of > > parallelism > > >>> > > > > constraints. > > >>> > > > > > > >>> > > > > Best, > > >>> > > > > Junrui > > >>> > > > > > > >>> > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五 > > >>> 02:10写道: > > >>> > > > > > > >>> > > > > > Gentle bump on this question. cc @Becket Qin < > > >>> becket....@gmail.com > > >>> > > > > >>> > > as > > >>> > > > > > well. > > >>> > > > > > > > >>> > > > > > Regards > > >>> > > > > > Venkata krishnan > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan > Sowrirajan < > > >>> > > > > > vsowr...@asu.edu> wrote: > > >>> > > > > > > > >>> > > > > > > Thanks for the response Lijie and Junrui. Sorry for the > > late > > >>> > reply. > > >>> > > > Few > > >>> > > > > > > follow up questions. > > >>> > > > > > > > > >>> > > > > > > > Source can actually ignore this limit > > >>> > > > > > > because it has no upstream, but this will lead to > semantic > > >>> > > > > inconsistency. > > >>> > > > > > > > > >>> > > > > > > Lijie, can you please elaborate on the above comment > > further? > > >>> > What > > >>> > > do > > >>> > > > > you > > >>> > > > > > > mean when you say it will lead to "semantic > inconsistency"? > > >>> > > > > > > > > >>> > > > > > > > Secondly, we first need to limit the max parallelism of > > >>> > > > (downstream) > > >>> > > > > > > vertex, and then we can decide how many subpartitions > > >>> (upstream > > >>> > > > vertex) > > >>> > > > > > > should produce. The limit should be effective, otherwise > > some > > >>> > > > > downstream > > >>> > > > > > > tasks will have no data to process. > > >>> > > > > > > > > >>> > > > > > > This makes sense in the context of any other vertices > other > > >>> than > > >>> > > the > > >>> > > > > > > source vertex. As you mentioned above ("Source can > actually > > >>> > ignore > > >>> > > > this > > >>> > > > > > > limit because it has no upstream"), therefore I feel " > > >>> > > > > > > > > >>> jobmanager.adaptive-batch-scheduler.default-source-parallelism" > > >>> > > need > > >>> > > > > not > > >>> > > > > > > be upper bounded by > > >>> > > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism". > > >>> > > > > > > > > >>> > > > > > > Regards > > >>> > > > > > > Venkata krishnan > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee < > > >>> jrlee....@gmail.com> > > >>> > > > > wrote: > > >>> > > > > > > > > >>> > > > > > >> Hi Venkat, > > >>> > > > > > >> > > >>> > > > > > >> As Lijie mentioned, in Flink, the parallelism is > required > > >>> to be > > >>> > > > less > > >>> > > > > > than > > >>> > > > > > >> or equal to the maximum parallelism. The config option > > >>> > > > > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and > > >>> > > > > > >> > > >>> jobmanager.adaptive-batch-scheduler.default-source-parallelism > > >>> > > will > > >>> > > > be > > >>> > > > > > set > > >>> > > > > > >> as the source's parallelism and max-parallelism, > > >>> respectively. > > >>> > > > > > Therefore, > > >>> > > > > > >> the check failed situation you encountered is in line > with > > >>> the > > >>> > > > > > >> expectations. > > >>> > > > > > >> > > >>> > > > > > >> Best, > > >>> > > > > > >> Junrui > > >>> > > > > > >> > > >>> > > > > > >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 > > >>> 17:35写道: > > >>> > > > > > >> > > >>> > > > > > >> > Hi Venkat, > > >>> > > > > > >> > > > >>> > > > > > >> > >> default-source-parallelism config should be > > independent > > >>> > from > > >>> > > > the > > >>> > > > > > >> > max-parallelism > > >>> > > > > > >> > > > >>> > > > > > >> > Actually, it's not. > > >>> > > > > > >> > > > >>> > > > > > >> > Firstly, it's obvious that the parallelism should be > > less > > >>> than > > >>> > > or > > >>> > > > > > equal > > >>> > > > > > >> to > > >>> > > > > > >> > the max parallelism(both literally and execution). The > > >>> > > > > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" > > >>> will be > > >>> > > used > > >>> > > > > as > > >>> > > > > > >> the > > >>> > > > > > >> > max parallelism for a vertex if you don't set max > > >>> parallelism > > >>> > > for > > >>> > > > it > > >>> > > > > > >> > individually (Just like the source in your case). > > >>> > > > > > >> > > > >>> > > > > > >> > Secondly, we first need to limit the max parallelism > of > > >>> > > > (downstream) > > >>> > > > > > >> > vertex, and then we can decide how many subpartitions > > >>> > (upstream > > >>> > > > > > vertex) > > >>> > > > > > >> > should produce. The limit should be effective, > otherwise > > >>> some > > >>> > > > > > downstream > > >>> > > > > > >> > tasks will have no data to process. Source can > actually > > >>> ignore > > >>> > > > this > > >>> > > > > > >> limit > > >>> > > > > > >> > because it has no upstream, but this will lead to > > semantic > > >>> > > > > > >> inconsistency. > > >>> > > > > > >> > > > >>> > > > > > >> > Best, > > >>> > > > > > >> > Lijie > > >>> > > > > > >> > > > >>> > > > > > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> > > >>> 于2024年2月29日周四 > > >>> > > > > 05:49写道: > > >>> > > > > > >> > > > >>> > > > > > >> > > Hi Flink devs, > > >>> > > > > > >> > > > > >>> > > > > > >> > > With Flink's AdaptiveBatchScheduler > > >>> > > > > > >> > > < > > >>> > > > > > >> > > > > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$ > > >>> > > > > > >> > > > > > >>> > > > > > >> > > (Note: > > >>> > > > > > >> > > this is different from AdaptiveScheduler > > >>> > > > > > >> > > < > > >>> > > > > > >> > > > > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$ > > >>> > > > > > >> > > >), > > >>> > > > > > >> > > the scheduler automatically determines the correct > > >>> number of > > >>> > > > > > >> downstream > > >>> > > > > > >> > > tasks required to process the shuffle generated by > the > > >>> > > upstream > > >>> > > > > > >> vertex. > > >>> > > > > > >> > > > > >>> > > > > > >> > > I have a question regarding the current behavior. > > There > > >>> are > > >>> > 2 > > >>> > > > > > configs > > >>> > > > > > >> > which > > >>> > > > > > >> > > are in interplay here. > > >>> > > > > > >> > > 1. > > >>> > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism > > >>> > > > > > >> > > < > > >>> > > > > > >> > > > > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ > > >>> > > > > > >> > > > > > >>> > > > > > >> > > - The default parallelism of data source. > > >>> > > > > > >> > > 2. > jobmanager.adaptive-batch-scheduler.max-parallelism > > >>> > > > > > >> > > < > > >>> > > > > > >> > > > > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ > > >>> > > > > > >> > > > > > >>> > > > > > >> > > - > > >>> > > > > > >> > > Upper bound of allowed parallelism to set > adaptively. > > >>> > > > > > >> > > > > >>> > > > > > >> > > Currently, if " > > >>> > > > > > >> > > > > >>> > jobmanager.adaptive-batch-scheduler.default-source-parallelism > > >>> > > > > > >> > > < > > >>> > > > > > >> > > > > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ > > >>> > > > > > >> > > >" > > >>> > > > > > >> > > is greater than > > >>> > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism > > >>> > > > > > >> > > < > > >>> > > > > > >> > > > > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ > > >>> > > > > > >> > > >", > > >>> > > > > > >> > > Flink application fails with the below message: > > >>> > > > > > >> > > > > >>> > > > > > >> > > "Vertex's parallelism should be smaller than or > equal > > to > > >>> > > > vertex's > > >>> > > > > > max > > >>> > > > > > >> > > parallelism." > > >>> > > > > > >> > > > > >>> > > > > > >> > > This is the corresponding code in Flink's > > >>> > > > > > DefaultVertexParallelismInfo > > >>> > > > > > >> > > < > > >>> > > > > > >> > > > > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$ > > >>> > > > > > >> > > >. > > >>> > > > > > >> > > My question is, "default-source-parallelism" config > > >>> should > > >>> > be > > >>> > > > > > >> independent > > >>> > > > > > >> > > from the "max-parallelism" flag. The former controls > > the > > >>> > > default > > >>> > > > > > >> source > > >>> > > > > > >> > > parallelism while the latter controls the max number > > of > > >>> > > > partitions > > >>> > > > > > to > > >>> > > > > > >> > write > > >>> > > > > > >> > > the intermediate shuffle. > > >>> > > > > > >> > > > > >>> > > > > > >> > > If this is true, then the above check should be > fixed. > > >>> > > > Otherwise, > > >>> > > > > > >> wanted > > >>> > > > > > >> > to > > >>> > > > > > >> > > understand why the "default-source-parallelism` > should > > >>> be > > >>> > less > > >>> > > > > than > > >>> > > > > > >> the > > >>> > > > > > >> > > "max-parallelism" > > >>> > > > > > >> > > > > >>> > > > > > >> > > Thanks > > >>> > > > > > >> > > Venkat > > >>> > > > > > >> > > > > >>> > > > > > >> > > > >>> > > > > > >> > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >> > > >