Gentle bump on this question. cc @Becket Qin <becket....@gmail.com> as well.
Regards Venkata krishnan On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < vsowr...@asu.edu> wrote: > Thanks for the response Lijie and Junrui. Sorry for the late reply. Few > follow up questions. > > > Source can actually ignore this limit > because it has no upstream, but this will lead to semantic inconsistency. > > Lijie, can you please elaborate on the above comment further? What do you > mean when you say it will lead to "semantic inconsistency"? > > > Secondly, we first need to limit the max parallelism of (downstream) > vertex, and then we can decide how many subpartitions (upstream vertex) > should produce. The limit should be effective, otherwise some downstream > tasks will have no data to process. > > This makes sense in the context of any other vertices other than the > source vertex. As you mentioned above ("Source can actually ignore this > limit because it has no upstream"), therefore I feel " > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need not > be upper bounded by "jobmanager.adaptive-batch-scheduler.max-parallelism". > > Regards > Venkata krishnan > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee <jrlee....@gmail.com> wrote: > >> Hi Venkat, >> >> As Lijie mentioned, in Flink, the parallelism is required to be less than >> or equal to the maximum parallelism. The config option >> jobmanager.adaptive-batch-scheduler.max-parallelism and >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will be set >> as the source's parallelism and max-parallelism, respectively. Therefore, >> the check failed situation you encountered is in line with the >> expectations. >> >> Best, >> Junrui >> >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 17:35写道: >> >> > Hi Venkat, >> > >> > >> default-source-parallelism config should be independent from the >> > max-parallelism >> > >> > Actually, it's not. >> > >> > Firstly, it's obvious that the parallelism should be less than or equal >> to >> > the max parallelism(both literally and execution). The >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as >> the >> > max parallelism for a vertex if you don't set max parallelism for it >> > individually (Just like the source in your case). >> > >> > Secondly, we first need to limit the max parallelism of (downstream) >> > vertex, and then we can decide how many subpartitions (upstream vertex) >> > should produce. The limit should be effective, otherwise some downstream >> > tasks will have no data to process. Source can actually ignore this >> limit >> > because it has no upstream, but this will lead to semantic >> inconsistency. >> > >> > Best, >> > Lijie >> > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年2月29日周四 05:49写道: >> > >> > > Hi Flink devs, >> > > >> > > With Flink's AdaptiveBatchScheduler >> > > < >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$ >> > > > >> > > (Note: >> > > this is different from AdaptiveScheduler >> > > < >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$ >> > > >), >> > > the scheduler automatically determines the correct number of >> downstream >> > > tasks required to process the shuffle generated by the upstream >> vertex. >> > > >> > > I have a question regarding the current behavior. There are 2 configs >> > which >> > > are in interplay here. >> > > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism >> > > < >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ >> > > > >> > > - The default parallelism of data source. >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism >> > > < >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ >> > > > >> > > - >> > > Upper bound of allowed parallelism to set adaptively. >> > > >> > > Currently, if " >> > > jobmanager.adaptive-batch-scheduler.default-source-parallelism >> > > < >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ >> > > >" >> > > is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism >> > > < >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ >> > > >", >> > > Flink application fails with the below message: >> > > >> > > "Vertex's parallelism should be smaller than or equal to vertex's max >> > > parallelism." >> > > >> > > This is the corresponding code in Flink's DefaultVertexParallelismInfo >> > > < >> > > >> > >> https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$ >> > > >. >> > > My question is, "default-source-parallelism" config should be >> independent >> > > from the "max-parallelism" flag. The former controls the default >> source >> > > parallelism while the latter controls the max number of partitions to >> > write >> > > the intermediate shuffle. >> > > >> > > If this is true, then the above check should be fixed. Otherwise, >> wanted >> > to >> > > understand why the "default-source-parallelism` should be less than >> the >> > > "max-parallelism" >> > > >> > > Thanks >> > > Venkat >> > > >> > >> >