Hi Xia and Jinrui,

Filed https://github.com/apache/flink/pull/24736 to address the above
described issue. Please take a look whenever you can.

Thanks
Venkat


On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan <
vsowr...@asu.edu> wrote:

> Filed https://issues.apache.org/jira/browse/FLINK-35165 to address the
> above described issue. Will share the PR here once it is ready for review.
>
> Regards
> Venkata krishnan
>
>
> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee <jrlee....@gmail.com> wrote:
>
>> Thanks Venkata and Xia for providing further clarification. I think your
>> example illustrates the significance of this proposal very well. Please
>> feel free go ahead and address the concerns.
>>
>> Best,
>> Junrui
>>
>> Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月16日周二 07:01写道:
>>
>> > Thanks for adding your thoughts to this discussion.
>> >
>> > If we all agree that the source vertex parallelism shouldn't be bound by
>> > the downstream max parallelism
>> > (jobmanager.adaptive-batch-scheduler.max-parallelism)
>> > based on the rationale and the issues described above, I can take a
>> stab at
>> > addressing the issue.
>> >
>> > Let me file a ticket to track this issue. Otherwise, I'm looking
>> forward to
>> > hearing more thoughts from others as well, especially Lijie and Junrui
>> who
>> > have more context on the AdaptiveBatchScheduler.
>> >
>> > Regards
>> > Venkata krishnan
>> >
>> >
>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun <xingbe...@gmail.com> wrote:
>> >
>> > > Hi Venkat,
>> > > I agree that the parallelism of source vertex should not be upper
>> bounded
>> > > by the job's global max parallelism. The case you mentioned, >> High
>> > filter
>> > > selectivity with huge amounts of data to read  excellently supports
>> this
>> > > viewpoint. (In fact, in the current implementation, if the source
>> > > parallelism is pre-specified at job create stage, rather than relying
>> on
>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, the
>> > source
>> > > vertex's parallelism can indeed exceed the job's global max
>> parallelism.)
>> > >
>> > > As Lijie and Junrui pointed out, the key issue is "semantic
>> consistency."
>> > > Currently, if a vertex has not set maxParallelism, the
>> > > AdaptiveBatchScheduler will use
>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the
>> > vertex's
>> > > maxParallelism. Since the current implementation does not distinguish
>> > > between source vertices and downstream vertices, source vertices are
>> also
>> > > subject to this limitation.
>> > >
>> > > Therefore, I believe that if the issue of "semantic consistency" can
>> be
>> > > well explained in the code and configuration documentation, the
>> > > AdaptiveBatchScheduler should support that the parallelism of source
>> > > vertices can exceed the job's global max parallelism.
>> > >
>> > > Best,
>> > > Xia
>> > >
>> > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月14日周日 10:31写道:
>> > >
>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche*
>> > > > *duler.default-source-parallelism*" should not be bound by the "
>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
>> > > >
>> > > >    - Source vertex is unique and does not have any upstream vertices
>> > > >    - Downstream vertices read shuffled data partitioned by key,
>> which
>> > is
>> > > >    not the case for the Source vertex
>> > > >    - Limiting source parallelism by downstream vertices' max
>> > parallelism
>> > > is
>> > > >    incorrect
>> > > >
>> > > > If we say for ""semantic consistency" the source vertex parallelism
>> has
>> > > to
>> > > > be bound by the overall job's max parallelism, it can lead to
>> following
>> > > > issues:
>> > > >
>> > > >    - High filter selectivity with huge amounts of data to read -
>> > setting
>> > > >    high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so
>> that
>> > > >    source parallelism can be set higher can lead to small blocks and
>> > > >    sub-optimal performance.
>> > > >    - Setting high
>> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
>> > > >    requires careful tuning of network buffer configurations which is
>> > > >    unnecessary in cases where it is not required just so that the
>> > source
>> > > >    parallelism can be set high.
>> > > >
>> > > > Regards
>> > > > Venkata krishnan
>> > > >
>> > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee <jrlee....@gmail.com>
>> > wrote:
>> > > >
>> > > > > Hello Venkata krishnan,
>> > > > >
>> > > > > I think the term "semantic inconsistency" defined by
>> > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to
>> > > > maintaining a
>> > > > > uniform upper limit on parallelism across all vertices within a
>> job.
>> > As
>> > > > the
>> > > > > source vertices are part of the global execution graph, they
>> should
>> > > also
>> > > > > respect this rule to ensure consistent application of parallelism
>> > > > > constraints.
>> > > > >
>> > > > > Best,
>> > > > > Junrui
>> > > > >
>> > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五
>> 02:10写道:
>> > > > >
>> > > > > > Gentle bump on this question. cc @Becket Qin <
>> becket....@gmail.com
>> > >
>> > > as
>> > > > > > well.
>> > > > > >
>> > > > > > Regards
>> > > > > > Venkata krishnan
>> > > > > >
>> > > > > >
>> > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
>> > > > > > vsowr...@asu.edu> wrote:
>> > > > > >
>> > > > > > > Thanks for the response Lijie and Junrui. Sorry for the late
>> > reply.
>> > > > Few
>> > > > > > > follow up questions.
>> > > > > > >
>> > > > > > > > Source can actually ignore this limit
>> > > > > > > because it has no upstream, but this will lead to semantic
>> > > > > inconsistency.
>> > > > > > >
>> > > > > > > Lijie, can you please elaborate on the above comment further?
>> > What
>> > > do
>> > > > > you
>> > > > > > > mean when you say it will lead to "semantic inconsistency"?
>> > > > > > >
>> > > > > > > > Secondly, we first need to limit the max parallelism of
>> > > > (downstream)
>> > > > > > > vertex, and then we can decide how many subpartitions
>> (upstream
>> > > > vertex)
>> > > > > > > should produce. The limit should be effective, otherwise some
>> > > > > downstream
>> > > > > > > tasks will have no data to process.
>> > > > > > >
>> > > > > > > This makes sense in the context of any other vertices other
>> than
>> > > the
>> > > > > > > source vertex. As you mentioned above ("Source can actually
>> > ignore
>> > > > this
>> > > > > > > limit because it has no upstream"), therefore I feel "
>> > > > > > >
>> jobmanager.adaptive-batch-scheduler.default-source-parallelism"
>> > > need
>> > > > > not
>> > > > > > > be upper bounded by
>> > > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism".
>> > > > > > >
>> > > > > > > Regards
>> > > > > > > Venkata krishnan
>> > > > > > >
>> > > > > > >
>> > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee <
>> jrlee....@gmail.com>
>> > > > > wrote:
>> > > > > > >
>> > > > > > >> Hi Venkat,
>> > > > > > >>
>> > > > > > >> As Lijie mentioned,  in Flink, the parallelism is required
>> to be
>> > > > less
>> > > > > > than
>> > > > > > >> or equal to the maximum parallelism. The config option
>> > > > > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and
>> > > > > > >>
>> jobmanager.adaptive-batch-scheduler.default-source-parallelism
>> > > will
>> > > > be
>> > > > > > set
>> > > > > > >> as the source's parallelism and max-parallelism,
>> respectively.
>> > > > > > Therefore,
>> > > > > > >> the check failed situation you encountered is in line with
>> the
>> > > > > > >> expectations.
>> > > > > > >>
>> > > > > > >> Best,
>> > > > > > >> Junrui
>> > > > > > >>
>> > > > > > >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 17:35写道:
>> > > > > > >>
>> > > > > > >> > Hi Venkat,
>> > > > > > >> >
>> > > > > > >> > >> default-source-parallelism config should be independent
>> > from
>> > > > the
>> > > > > > >> > max-parallelism
>> > > > > > >> >
>> > > > > > >> > Actually, it's not.
>> > > > > > >> >
>> > > > > > >> > Firstly, it's obvious that the parallelism should be less
>> than
>> > > or
>> > > > > > equal
>> > > > > > >> to
>> > > > > > >> > the max parallelism(both literally and execution). The
>> > > > > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will
>> be
>> > > used
>> > > > > as
>> > > > > > >> the
>> > > > > > >> > max parallelism for a vertex if you don't set max
>> parallelism
>> > > for
>> > > > it
>> > > > > > >> > individually (Just like the source in your case).
>> > > > > > >> >
>> > > > > > >> > Secondly, we first need to limit the max parallelism of
>> > > > (downstream)
>> > > > > > >> > vertex, and then we can decide how many subpartitions
>> > (upstream
>> > > > > > vertex)
>> > > > > > >> > should produce. The limit should be effective, otherwise
>> some
>> > > > > > downstream
>> > > > > > >> > tasks will have no data to process. Source can actually
>> ignore
>> > > > this
>> > > > > > >> limit
>> > > > > > >> > because it has no upstream, but this will lead to semantic
>> > > > > > >> inconsistency.
>> > > > > > >> >
>> > > > > > >> > Best,
>> > > > > > >> > Lijie
>> > > > > > >> >
>> > > > > > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu>
>> 于2024年2月29日周四
>> > > > > 05:49写道:
>> > > > > > >> >
>> > > > > > >> > > Hi Flink devs,
>> > > > > > >> > >
>> > > > > > >> > > With Flink's AdaptiveBatchScheduler
>> > > > > > >> > > <
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$
>> > > > > > >> > > >
>> > > > > > >> > > (Note:
>> > > > > > >> > > this is different from AdaptiveScheduler
>> > > > > > >> > > <
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$
>> > > > > > >> > > >),
>> > > > > > >> > > the scheduler automatically determines the correct
>> number of
>> > > > > > >> downstream
>> > > > > > >> > > tasks required to process the shuffle generated by the
>> > > upstream
>> > > > > > >> vertex.
>> > > > > > >> > >
>> > > > > > >> > > I have a question regarding the current behavior. There
>> are
>> > 2
>> > > > > > configs
>> > > > > > >> > which
>> > > > > > >> > > are in interplay here.
>> > > > > > >> > > 1.
>> > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism
>> > > > > > >> > > <
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
>> > > > > > >> > > >
>> > > > > > >> > >  - The default parallelism of data source.
>> > > > > > >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism
>> > > > > > >> > > <
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
>> > > > > > >> > > >
>> > > > > > >> > > -
>> > > > > > >> > > Upper bound of allowed parallelism to set adaptively.
>> > > > > > >> > >
>> > > > > > >> > > Currently, if "
>> > > > > > >> > >
>> > jobmanager.adaptive-batch-scheduler.default-source-parallelism
>> > > > > > >> > > <
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
>> > > > > > >> > > >"
>> > > > > > >> > > is greater than
>> > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism
>> > > > > > >> > > <
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
>> > > > > > >> > > >",
>> > > > > > >> > > Flink application fails with the below message:
>> > > > > > >> > >
>> > > > > > >> > > "Vertex's parallelism should be smaller than or equal to
>> > > > vertex's
>> > > > > > max
>> > > > > > >> > > parallelism."
>> > > > > > >> > >
>> > > > > > >> > > This is the corresponding code in Flink's
>> > > > > > DefaultVertexParallelismInfo
>> > > > > > >> > > <
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$
>> > > > > > >> > > >.
>> > > > > > >> > > My question is, "default-source-parallelism" config
>> should
>> > be
>> > > > > > >> independent
>> > > > > > >> > > from the "max-parallelism" flag. The former controls the
>> > > default
>> > > > > > >> source
>> > > > > > >> > > parallelism while the latter controls the max number of
>> > > > partitions
>> > > > > > to
>> > > > > > >> > write
>> > > > > > >> > > the intermediate shuffle.
>> > > > > > >> > >
>> > > > > > >> > > If this is true, then the above check should be fixed.
>> > > > Otherwise,
>> > > > > > >> wanted
>> > > > > > >> > to
>> > > > > > >> > > understand why the "default-source-parallelism` should be
>> > less
>> > > > > than
>> > > > > > >> the
>> > > > > > >> > > "max-parallelism"
>> > > > > > >> > >
>> > > > > > >> > > Thanks
>> > > > > > >> > > Venkat
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >>
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to