Thanks for adding your thoughts to this discussion.

If we all agree that the source vertex parallelism shouldn't be bound by
the downstream max parallelism
(jobmanager.adaptive-batch-scheduler.max-parallelism)
based on the rationale and the issues described above, I can take a stab at
addressing the issue.

Let me file a ticket to track this issue. Otherwise, I'm looking forward to
hearing more thoughts from others as well, especially Lijie and Junrui who
have more context on the AdaptiveBatchScheduler.

Regards
Venkata krishnan


On Mon, Apr 15, 2024 at 12:54 AM Xia Sun <xingbe...@gmail.com> wrote:

> Hi Venkat,
> I agree that the parallelism of source vertex should not be upper bounded
> by the job's global max parallelism. The case you mentioned, >> High filter
> selectivity with huge amounts of data to read  excellently supports this
> viewpoint. (In fact, in the current implementation, if the source
> parallelism is pre-specified at job create stage, rather than relying on
> the dynamic parallelism inference of the AdaptiveBatchScheduler, the source
> vertex's parallelism can indeed exceed the job's global max parallelism.)
>
> As Lijie and Junrui pointed out, the key issue is "semantic consistency."
> Currently, if a vertex has not set maxParallelism, the
> AdaptiveBatchScheduler will use
> `execution.batch.adaptive.auto-parallelism.max-parallelism` as the vertex's
> maxParallelism. Since the current implementation does not distinguish
> between source vertices and downstream vertices, source vertices are also
> subject to this limitation.
>
> Therefore, I believe that if the issue of "semantic consistency" can be
> well explained in the code and configuration documentation, the
> AdaptiveBatchScheduler should support that the parallelism of source
> vertices can exceed the job's global max parallelism.
>
> Best,
> Xia
>
> Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月14日周日 10:31写道:
>
> > Let me state why I think "*jobmanager.adaptive-batch-sche*
> > *duler.default-source-parallelism*" should not be bound by the "
> > *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
> >
> >    - Source vertex is unique and does not have any upstream vertices
> >    - Downstream vertices read shuffled data partitioned by key, which is
> >    not the case for the Source vertex
> >    - Limiting source parallelism by downstream vertices' max parallelism
> is
> >    incorrect
> >
> > If we say for ""semantic consistency" the source vertex parallelism has
> to
> > be bound by the overall job's max parallelism, it can lead to following
> > issues:
> >
> >    - High filter selectivity with huge amounts of data to read - setting
> >    high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that
> >    source parallelism can be set higher can lead to small blocks and
> >    sub-optimal performance.
> >    - Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
> >    requires careful tuning of network buffer configurations which is
> >    unnecessary in cases where it is not required just so that the source
> >    parallelism can be set high.
> >
> > Regards
> > Venkata krishnan
> >
> > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee <jrlee....@gmail.com> wrote:
> >
> > > Hello Venkata krishnan,
> > >
> > > I think the term "semantic inconsistency" defined by
> > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to
> > maintaining a
> > > uniform upper limit on parallelism across all vertices within a job. As
> > the
> > > source vertices are part of the global execution graph, they should
> also
> > > respect this rule to ensure consistent application of parallelism
> > > constraints.
> > >
> > > Best,
> > > Junrui
> > >
> > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五 02:10写道:
> > >
> > > > Gentle bump on this question. cc @Becket Qin <becket....@gmail.com>
> as
> > > > well.
> > > >
> > > > Regards
> > > > Venkata krishnan
> > > >
> > > >
> > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
> > > > vsowr...@asu.edu> wrote:
> > > >
> > > > > Thanks for the response Lijie and Junrui. Sorry for the late reply.
> > Few
> > > > > follow up questions.
> > > > >
> > > > > > Source can actually ignore this limit
> > > > > because it has no upstream, but this will lead to semantic
> > > inconsistency.
> > > > >
> > > > > Lijie, can you please elaborate on the above comment further? What
> do
> > > you
> > > > > mean when you say it will lead to "semantic inconsistency"?
> > > > >
> > > > > > Secondly, we first need to limit the max parallelism of
> > (downstream)
> > > > > vertex, and then we can decide how many subpartitions (upstream
> > vertex)
> > > > > should produce. The limit should be effective, otherwise some
> > > downstream
> > > > > tasks will have no data to process.
> > > > >
> > > > > This makes sense in the context of any other vertices other than
> the
> > > > > source vertex. As you mentioned above ("Source can actually ignore
> > this
> > > > > limit because it has no upstream"), therefore I feel "
> > > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism"
> need
> > > not
> > > > > be upper bounded by
> > > > "jobmanager.adaptive-batch-scheduler.max-parallelism".
> > > > >
> > > > > Regards
> > > > > Venkata krishnan
> > > > >
> > > > >
> > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee <jrlee....@gmail.com>
> > > wrote:
> > > > >
> > > > >> Hi Venkat,
> > > > >>
> > > > >> As Lijie mentioned,  in Flink, the parallelism is required to be
> > less
> > > > than
> > > > >> or equal to the maximum parallelism. The config option
> > > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and
> > > > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism
> will
> > be
> > > > set
> > > > >> as the source's parallelism and max-parallelism, respectively.
> > > > Therefore,
> > > > >> the check failed situation you encountered is in line with the
> > > > >> expectations.
> > > > >>
> > > > >> Best,
> > > > >> Junrui
> > > > >>
> > > > >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 17:35写道:
> > > > >>
> > > > >> > Hi Venkat,
> > > > >> >
> > > > >> > >> default-source-parallelism config should be independent from
> > the
> > > > >> > max-parallelism
> > > > >> >
> > > > >> > Actually, it's not.
> > > > >> >
> > > > >> > Firstly, it's obvious that the parallelism should be less than
> or
> > > > equal
> > > > >> to
> > > > >> > the max parallelism(both literally and execution). The
> > > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be
> used
> > > as
> > > > >> the
> > > > >> > max parallelism for a vertex if you don't set max parallelism
> for
> > it
> > > > >> > individually (Just like the source in your case).
> > > > >> >
> > > > >> > Secondly, we first need to limit the max parallelism of
> > (downstream)
> > > > >> > vertex, and then we can decide how many subpartitions (upstream
> > > > vertex)
> > > > >> > should produce. The limit should be effective, otherwise some
> > > > downstream
> > > > >> > tasks will have no data to process. Source can actually ignore
> > this
> > > > >> limit
> > > > >> > because it has no upstream, but this will lead to semantic
> > > > >> inconsistency.
> > > > >> >
> > > > >> > Best,
> > > > >> > Lijie
> > > > >> >
> > > > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年2月29日周四
> > > 05:49写道:
> > > > >> >
> > > > >> > > Hi Flink devs,
> > > > >> > >
> > > > >> > > With Flink's AdaptiveBatchScheduler
> > > > >> > > <
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$
> > > > >> > > >
> > > > >> > > (Note:
> > > > >> > > this is different from AdaptiveScheduler
> > > > >> > > <
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$
> > > > >> > > >),
> > > > >> > > the scheduler automatically determines the correct number of
> > > > >> downstream
> > > > >> > > tasks required to process the shuffle generated by the
> upstream
> > > > >> vertex.
> > > > >> > >
> > > > >> > > I have a question regarding the current behavior. There are 2
> > > > configs
> > > > >> > which
> > > > >> > > are in interplay here.
> > > > >> > > 1.
> > jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > > > >> > > <
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
> > > > >> > > >
> > > > >> > >  - The default parallelism of data source.
> > > > >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism
> > > > >> > > <
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
> > > > >> > > >
> > > > >> > > -
> > > > >> > > Upper bound of allowed parallelism to set adaptively.
> > > > >> > >
> > > > >> > > Currently, if "
> > > > >> > > jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > > > >> > > <
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
> > > > >> > > >"
> > > > >> > > is greater than
> > > "jobmanager.adaptive-batch-scheduler.max-parallelism
> > > > >> > > <
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
> > > > >> > > >",
> > > > >> > > Flink application fails with the below message:
> > > > >> > >
> > > > >> > > "Vertex's parallelism should be smaller than or equal to
> > vertex's
> > > > max
> > > > >> > > parallelism."
> > > > >> > >
> > > > >> > > This is the corresponding code in Flink's
> > > > DefaultVertexParallelismInfo
> > > > >> > > <
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$
> > > > >> > > >.
> > > > >> > > My question is, "default-source-parallelism" config should be
> > > > >> independent
> > > > >> > > from the "max-parallelism" flag. The former controls the
> default
> > > > >> source
> > > > >> > > parallelism while the latter controls the max number of
> > partitions
> > > > to
> > > > >> > write
> > > > >> > > the intermediate shuffle.
> > > > >> > >
> > > > >> > > If this is true, then the above check should be fixed.
> > Otherwise,
> > > > >> wanted
> > > > >> > to
> > > > >> > > understand why the "default-source-parallelism` should be less
> > > than
> > > > >> the
> > > > >> > > "max-parallelism"
> > > > >> > >
> > > > >> > > Thanks
> > > > >> > > Venkat
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to