Hi Xia and Jinrui, Filed https://github.com/apache/flink/pull/24736 to address the above described issue. Please take a look whenever you can.
Thanks Venkat On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan < vsowr...@asu.edu> wrote: > Filed https://issues.apache.org/jira/browse/FLINK-35165 to address the > above described issue. Will share the PR here once it is ready for review. > > Regards > Venkata krishnan > > > On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee <jrlee....@gmail.com> wrote: > >> Thanks Venkata and Xia for providing further clarification. I think your >> example illustrates the significance of this proposal very well. Please >> feel free go ahead and address the concerns. >> >> Best, >> Junrui >> >> Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月16日周二 07:01写道: >> >> > Thanks for adding your thoughts to this discussion. >> > >> > If we all agree that the source vertex parallelism shouldn't be bound by >> > the downstream max parallelism >> > (jobmanager.adaptive-batch-scheduler.max-parallelism) >> > based on the rationale and the issues described above, I can take a >> stab at >> > addressing the issue. >> > >> > Let me file a ticket to track this issue. Otherwise, I'm looking >> forward to >> > hearing more thoughts from others as well, especially Lijie and Junrui >> who >> > have more context on the AdaptiveBatchScheduler. >> > >> > Regards >> > Venkata krishnan >> > >> > >> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun <xingbe...@gmail.com> wrote: >> > >> > > Hi Venkat, >> > > I agree that the parallelism of source vertex should not be upper >> bounded >> > > by the job's global max parallelism. The case you mentioned, >> High >> > filter >> > > selectivity with huge amounts of data to read excellently supports >> this >> > > viewpoint. (In fact, in the current implementation, if the source >> > > parallelism is pre-specified at job create stage, rather than relying >> on >> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, the >> > source >> > > vertex's parallelism can indeed exceed the job's global max >> parallelism.) >> > > >> > > As Lijie and Junrui pointed out, the key issue is "semantic >> consistency." >> > > Currently, if a vertex has not set maxParallelism, the >> > > AdaptiveBatchScheduler will use >> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the >> > vertex's >> > > maxParallelism. Since the current implementation does not distinguish >> > > between source vertices and downstream vertices, source vertices are >> also >> > > subject to this limitation. >> > > >> > > Therefore, I believe that if the issue of "semantic consistency" can >> be >> > > well explained in the code and configuration documentation, the >> > > AdaptiveBatchScheduler should support that the parallelism of source >> > > vertices can exceed the job's global max parallelism. >> > > >> > > Best, >> > > Xia >> > > >> > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月14日周日 10:31写道: >> > > >> > > > Let me state why I think "*jobmanager.adaptive-batch-sche* >> > > > *duler.default-source-parallelism*" should not be bound by the " >> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". >> > > > >> > > > - Source vertex is unique and does not have any upstream vertices >> > > > - Downstream vertices read shuffled data partitioned by key, >> which >> > is >> > > > not the case for the Source vertex >> > > > - Limiting source parallelism by downstream vertices' max >> > parallelism >> > > is >> > > > incorrect >> > > > >> > > > If we say for ""semantic consistency" the source vertex parallelism >> has >> > > to >> > > > be bound by the overall job's max parallelism, it can lead to >> following >> > > > issues: >> > > > >> > > > - High filter selectivity with huge amounts of data to read - >> > setting >> > > > high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so >> that >> > > > source parallelism can be set higher can lead to small blocks and >> > > > sub-optimal performance. >> > > > - Setting high >> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*" >> > > > requires careful tuning of network buffer configurations which is >> > > > unnecessary in cases where it is not required just so that the >> > source >> > > > parallelism can be set high. >> > > > >> > > > Regards >> > > > Venkata krishnan >> > > > >> > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee <jrlee....@gmail.com> >> > wrote: >> > > > >> > > > > Hello Venkata krishnan, >> > > > > >> > > > > I think the term "semantic inconsistency" defined by >> > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to >> > > > maintaining a >> > > > > uniform upper limit on parallelism across all vertices within a >> job. >> > As >> > > > the >> > > > > source vertices are part of the global execution graph, they >> should >> > > also >> > > > > respect this rule to ensure consistent application of parallelism >> > > > > constraints. >> > > > > >> > > > > Best, >> > > > > Junrui >> > > > > >> > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五 >> 02:10写道: >> > > > > >> > > > > > Gentle bump on this question. cc @Becket Qin < >> becket....@gmail.com >> > > >> > > as >> > > > > > well. >> > > > > > >> > > > > > Regards >> > > > > > Venkata krishnan >> > > > > > >> > > > > > >> > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < >> > > > > > vsowr...@asu.edu> wrote: >> > > > > > >> > > > > > > Thanks for the response Lijie and Junrui. Sorry for the late >> > reply. >> > > > Few >> > > > > > > follow up questions. >> > > > > > > >> > > > > > > > Source can actually ignore this limit >> > > > > > > because it has no upstream, but this will lead to semantic >> > > > > inconsistency. >> > > > > > > >> > > > > > > Lijie, can you please elaborate on the above comment further? >> > What >> > > do >> > > > > you >> > > > > > > mean when you say it will lead to "semantic inconsistency"? >> > > > > > > >> > > > > > > > Secondly, we first need to limit the max parallelism of >> > > > (downstream) >> > > > > > > vertex, and then we can decide how many subpartitions >> (upstream >> > > > vertex) >> > > > > > > should produce. The limit should be effective, otherwise some >> > > > > downstream >> > > > > > > tasks will have no data to process. >> > > > > > > >> > > > > > > This makes sense in the context of any other vertices other >> than >> > > the >> > > > > > > source vertex. As you mentioned above ("Source can actually >> > ignore >> > > > this >> > > > > > > limit because it has no upstream"), therefore I feel " >> > > > > > > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism" >> > > need >> > > > > not >> > > > > > > be upper bounded by >> > > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism". >> > > > > > > >> > > > > > > Regards >> > > > > > > Venkata krishnan >> > > > > > > >> > > > > > > >> > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee < >> jrlee....@gmail.com> >> > > > > wrote: >> > > > > > > >> > > > > > >> Hi Venkat, >> > > > > > >> >> > > > > > >> As Lijie mentioned, in Flink, the parallelism is required >> to be >> > > > less >> > > > > > than >> > > > > > >> or equal to the maximum parallelism. The config option >> > > > > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and >> > > > > > >> >> jobmanager.adaptive-batch-scheduler.default-source-parallelism >> > > will >> > > > be >> > > > > > set >> > > > > > >> as the source's parallelism and max-parallelism, >> respectively. >> > > > > > Therefore, >> > > > > > >> the check failed situation you encountered is in line with >> the >> > > > > > >> expectations. >> > > > > > >> >> > > > > > >> Best, >> > > > > > >> Junrui >> > > > > > >> >> > > > > > >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 17:35写道: >> > > > > > >> >> > > > > > >> > Hi Venkat, >> > > > > > >> > >> > > > > > >> > >> default-source-parallelism config should be independent >> > from >> > > > the >> > > > > > >> > max-parallelism >> > > > > > >> > >> > > > > > >> > Actually, it's not. >> > > > > > >> > >> > > > > > >> > Firstly, it's obvious that the parallelism should be less >> than >> > > or >> > > > > > equal >> > > > > > >> to >> > > > > > >> > the max parallelism(both literally and execution). The >> > > > > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will >> be >> > > used >> > > > > as >> > > > > > >> the >> > > > > > >> > max parallelism for a vertex if you don't set max >> parallelism >> > > for >> > > > it >> > > > > > >> > individually (Just like the source in your case). >> > > > > > >> > >> > > > > > >> > Secondly, we first need to limit the max parallelism of >> > > > (downstream) >> > > > > > >> > vertex, and then we can decide how many subpartitions >> > (upstream >> > > > > > vertex) >> > > > > > >> > should produce. The limit should be effective, otherwise >> some >> > > > > > downstream >> > > > > > >> > tasks will have no data to process. Source can actually >> ignore >> > > > this >> > > > > > >> limit >> > > > > > >> > because it has no upstream, but this will lead to semantic >> > > > > > >> inconsistency. >> > > > > > >> > >> > > > > > >> > Best, >> > > > > > >> > Lijie >> > > > > > >> > >> > > > > > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> >> 于2024年2月29日周四 >> > > > > 05:49写道: >> > > > > > >> > >> > > > > > >> > > Hi Flink devs, >> > > > > > >> > > >> > > > > > >> > > With Flink's AdaptiveBatchScheduler >> > > > > > >> > > < >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$ >> > > > > > >> > > > >> > > > > > >> > > (Note: >> > > > > > >> > > this is different from AdaptiveScheduler >> > > > > > >> > > < >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$ >> > > > > > >> > > >), >> > > > > > >> > > the scheduler automatically determines the correct >> number of >> > > > > > >> downstream >> > > > > > >> > > tasks required to process the shuffle generated by the >> > > upstream >> > > > > > >> vertex. >> > > > > > >> > > >> > > > > > >> > > I have a question regarding the current behavior. There >> are >> > 2 >> > > > > > configs >> > > > > > >> > which >> > > > > > >> > > are in interplay here. >> > > > > > >> > > 1. >> > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism >> > > > > > >> > > < >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ >> > > > > > >> > > > >> > > > > > >> > > - The default parallelism of data source. >> > > > > > >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism >> > > > > > >> > > < >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ >> > > > > > >> > > > >> > > > > > >> > > - >> > > > > > >> > > Upper bound of allowed parallelism to set adaptively. >> > > > > > >> > > >> > > > > > >> > > Currently, if " >> > > > > > >> > > >> > jobmanager.adaptive-batch-scheduler.default-source-parallelism >> > > > > > >> > > < >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ >> > > > > > >> > > >" >> > > > > > >> > > is greater than >> > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism >> > > > > > >> > > < >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ >> > > > > > >> > > >", >> > > > > > >> > > Flink application fails with the below message: >> > > > > > >> > > >> > > > > > >> > > "Vertex's parallelism should be smaller than or equal to >> > > > vertex's >> > > > > > max >> > > > > > >> > > parallelism." >> > > > > > >> > > >> > > > > > >> > > This is the corresponding code in Flink's >> > > > > > DefaultVertexParallelismInfo >> > > > > > >> > > < >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$ >> > > > > > >> > > >. >> > > > > > >> > > My question is, "default-source-parallelism" config >> should >> > be >> > > > > > >> independent >> > > > > > >> > > from the "max-parallelism" flag. The former controls the >> > > default >> > > > > > >> source >> > > > > > >> > > parallelism while the latter controls the max number of >> > > > partitions >> > > > > > to >> > > > > > >> > write >> > > > > > >> > > the intermediate shuffle. >> > > > > > >> > > >> > > > > > >> > > If this is true, then the above check should be fixed. >> > > > Otherwise, >> > > > > > >> wanted >> > > > > > >> > to >> > > > > > >> > > understand why the "default-source-parallelism` should be >> > less >> > > > > than >> > > > > > >> the >> > > > > > >> > > "max-parallelism" >> > > > > > >> > > >> > > > > > >> > > Thanks >> > > > > > >> > > Venkat >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >