Thanks for the response Lijie and Junrui. Sorry for the late reply. Few
follow up questions.

> Source can actually ignore this limit
because it has no upstream, but this will lead to semantic inconsistency.

Lijie, can you please elaborate on the above comment further? What do you
mean when you say it will lead to "semantic inconsistency"?

> Secondly, we first need to limit the max parallelism of (downstream)
vertex, and then we can decide how many subpartitions (upstream vertex)
should produce. The limit should be effective, otherwise some downstream
tasks will have no data to process.

This makes sense in the context of any other vertices other than the source
vertex. As you mentioned above ("Source can actually ignore this limit
because it has no upstream"), therefore I feel "
jobmanager.adaptive-batch-scheduler.default-source-parallelism" need not be
upper bounded by "jobmanager.adaptive-batch-scheduler.max-parallelism".

Regards
Venkata krishnan


On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee <jrlee....@gmail.com> wrote:

> Hi Venkat,
>
> As Lijie mentioned,  in Flink, the parallelism is required to be less than
> or equal to the maximum parallelism. The config option
> jobmanager.adaptive-batch-scheduler.max-parallelism and
> jobmanager.adaptive-batch-scheduler.default-source-parallelism will be set
> as the source's parallelism and max-parallelism, respectively. Therefore,
> the check failed situation you encountered is in line with the
> expectations.
>
> Best,
> Junrui
>
> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 17:35写道:
>
> > Hi Venkat,
> >
> > >> default-source-parallelism config should be independent from the
> > max-parallelism
> >
> > Actually, it's not.
> >
> > Firstly, it's obvious that the parallelism should be less than or equal
> to
> > the max parallelism(both literally and execution). The
> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as the
> > max parallelism for a vertex if you don't set max parallelism for it
> > individually (Just like the source in your case).
> >
> > Secondly, we first need to limit the max parallelism of (downstream)
> > vertex, and then we can decide how many subpartitions (upstream vertex)
> > should produce. The limit should be effective, otherwise some downstream
> > tasks will have no data to process. Source can actually ignore this limit
> > because it has no upstream, but this will lead to semantic inconsistency.
> >
> > Best,
> > Lijie
> >
> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年2月29日周四 05:49写道:
> >
> > > Hi Flink devs,
> > >
> > > With Flink's AdaptiveBatchScheduler
> > > <
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$
> > > >
> > > (Note:
> > > this is different from AdaptiveScheduler
> > > <
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$
> > > >),
> > > the scheduler automatically determines the correct number of downstream
> > > tasks required to process the shuffle generated by the upstream vertex.
> > >
> > > I have a question regarding the current behavior. There are 2 configs
> > which
> > > are in interplay here.
> > > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > > <
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
> > > >
> > >  - The default parallelism of data source.
> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism
> > > <
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
> > > >
> > > -
> > > Upper bound of allowed parallelism to set adaptively.
> > >
> > > Currently, if "
> > > jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > > <
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
> > > >"
> > > is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism
> > > <
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
> > > >",
> > > Flink application fails with the below message:
> > >
> > > "Vertex's parallelism should be smaller than or equal to vertex's max
> > > parallelism."
> > >
> > > This is the corresponding code in Flink's DefaultVertexParallelismInfo
> > > <
> > >
> >
> https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$
> > > >.
> > > My question is, "default-source-parallelism" config should be
> independent
> > > from the "max-parallelism" flag. The former controls the default source
> > > parallelism while the latter controls the max number of partitions to
> > write
> > > the intermediate shuffle.
> > >
> > > If this is true, then the above check should be fixed. Otherwise,
> wanted
> > to
> > > understand why the "default-source-parallelism` should be less than the
> > > "max-parallelism"
> > >
> > > Thanks
> > > Venkat
> > >
> >
>

Reply via email to