Gentle bump on this question. cc @Becket Qin <becket....@gmail.com> as well.

Regards
Venkata krishnan


On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
vsowr...@asu.edu> wrote:

> Thanks for the response Lijie and Junrui. Sorry for the late reply. Few
> follow up questions.
>
> > Source can actually ignore this limit
> because it has no upstream, but this will lead to semantic inconsistency.
>
> Lijie, can you please elaborate on the above comment further? What do you
> mean when you say it will lead to "semantic inconsistency"?
>
> > Secondly, we first need to limit the max parallelism of (downstream)
> vertex, and then we can decide how many subpartitions (upstream vertex)
> should produce. The limit should be effective, otherwise some downstream
> tasks will have no data to process.
>
> This makes sense in the context of any other vertices other than the
> source vertex. As you mentioned above ("Source can actually ignore this
> limit because it has no upstream"), therefore I feel "
> jobmanager.adaptive-batch-scheduler.default-source-parallelism" need not
> be upper bounded by "jobmanager.adaptive-batch-scheduler.max-parallelism".
>
> Regards
> Venkata krishnan
>
>
> On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee <jrlee....@gmail.com> wrote:
>
>> Hi Venkat,
>>
>> As Lijie mentioned,  in Flink, the parallelism is required to be less than
>> or equal to the maximum parallelism. The config option
>> jobmanager.adaptive-batch-scheduler.max-parallelism and
>> jobmanager.adaptive-batch-scheduler.default-source-parallelism will be set
>> as the source's parallelism and max-parallelism, respectively. Therefore,
>> the check failed situation you encountered is in line with the
>> expectations.
>>
>> Best,
>> Junrui
>>
>> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 17:35写道:
>>
>> > Hi Venkat,
>> >
>> > >> default-source-parallelism config should be independent from the
>> > max-parallelism
>> >
>> > Actually, it's not.
>> >
>> > Firstly, it's obvious that the parallelism should be less than or equal
>> to
>> > the max parallelism(both literally and execution). The
>> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as
>> the
>> > max parallelism for a vertex if you don't set max parallelism for it
>> > individually (Just like the source in your case).
>> >
>> > Secondly, we first need to limit the max parallelism of (downstream)
>> > vertex, and then we can decide how many subpartitions (upstream vertex)
>> > should produce. The limit should be effective, otherwise some downstream
>> > tasks will have no data to process. Source can actually ignore this
>> limit
>> > because it has no upstream, but this will lead to semantic
>> inconsistency.
>> >
>> > Best,
>> > Lijie
>> >
>> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年2月29日周四 05:49写道:
>> >
>> > > Hi Flink devs,
>> > >
>> > > With Flink's AdaptiveBatchScheduler
>> > > <
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$
>> > > >
>> > > (Note:
>> > > this is different from AdaptiveScheduler
>> > > <
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$
>> > > >),
>> > > the scheduler automatically determines the correct number of
>> downstream
>> > > tasks required to process the shuffle generated by the upstream
>> vertex.
>> > >
>> > > I have a question regarding the current behavior. There are 2 configs
>> > which
>> > > are in interplay here.
>> > > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism
>> > > <
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
>> > > >
>> > >  - The default parallelism of data source.
>> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism
>> > > <
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
>> > > >
>> > > -
>> > > Upper bound of allowed parallelism to set adaptively.
>> > >
>> > > Currently, if "
>> > > jobmanager.adaptive-batch-scheduler.default-source-parallelism
>> > > <
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
>> > > >"
>> > > is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism
>> > > <
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
>> > > >",
>> > > Flink application fails with the below message:
>> > >
>> > > "Vertex's parallelism should be smaller than or equal to vertex's max
>> > > parallelism."
>> > >
>> > > This is the corresponding code in Flink's DefaultVertexParallelismInfo
>> > > <
>> > >
>> >
>> https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$
>> > > >.
>> > > My question is, "default-source-parallelism" config should be
>> independent
>> > > from the "max-parallelism" flag. The former controls the default
>> source
>> > > parallelism while the latter controls the max number of partitions to
>> > write
>> > > the intermediate shuffle.
>> > >
>> > > If this is true, then the above check should be fixed. Otherwise,
>> wanted
>> > to
>> > > understand why the "default-source-parallelism` should be less than
>> the
>> > > "max-parallelism"
>> > >
>> > > Thanks
>> > > Venkat
>> > >
>> >
>>
>

Reply via email to