First of all my sincere apologies for slow progress on this one.

@Junrui Lee <jrlee....@gmail.com> and Xia,

I updated the PR with respect to the last set of feedback comments on the
PR. Please take a look. I am hoping to finish this one quickly.

Regards
Venkata krishnan


On Thu, May 9, 2024 at 4:35 AM Venkatakrishnan Sowrirajan <vsowr...@asu.edu>
wrote:

> Xia,
>
> Thanks for the reviews. Unfortunately due to work commitments I am little
> delayed in addressing your review comments. Mostly will be done by end of
> this week. Just a quick heads up.
>
> Jinrui,
>
> Thanks, that would be great.
>
> On Mon, May 6, 2024, 12:45 AM Junrui Lee <jrlee....@gmail.com> wrote:
>
>> Hi,
>> Thanks for the reminder. I will review it soon during my free time.
>>
>> Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年5月4日周六 10:10写道:
>>
>> > Jinrui and Xia
>> >
>> > Gentle ping for reviews.
>> >
>> > On Mon, Apr 29, 2024, 8:28 PM Venkatakrishnan Sowrirajan <
>> vsowr...@asu.edu
>> > >
>> > wrote:
>> >
>> > > Hi Xia and Jinrui,
>> > >
>> > > Filed
>> https://urldefense.com/v3/__https://github.com/apache/flink/pull/24736__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbictUrQLQ$
>> to address the above
>> > > described issue. Please take a look whenever you can.
>> > >
>> > > Thanks
>> > > Venkat
>> > >
>> > >
>> > > On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan <
>> > > vsowr...@asu.edu> wrote:
>> > >
>> > >> Filed
>> https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLINK-35165__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbim4QLkBQ$
>> to address the
>> > >> above described issue. Will share the PR here once it is ready for
>> > review.
>> > >>
>> > >> Regards
>> > >> Venkata krishnan
>> > >>
>> > >>
>> > >> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee <jrlee....@gmail.com>
>> wrote:
>> > >>
>> > >>> Thanks Venkata and Xia for providing further clarification. I think
>> > your
>> > >>> example illustrates the significance of this proposal very well.
>> Please
>> > >>> feel free go ahead and address the concerns.
>> > >>>
>> > >>> Best,
>> > >>> Junrui
>> > >>>
>> > >>> Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月16日周二
>> 07:01写道:
>> > >>>
>> > >>> > Thanks for adding your thoughts to this discussion.
>> > >>> >
>> > >>> > If we all agree that the source vertex parallelism shouldn't be
>> bound
>> > >>> by
>> > >>> > the downstream max parallelism
>> > >>> > (jobmanager.adaptive-batch-scheduler.max-parallelism)
>> > >>> > based on the rationale and the issues described above, I can take
>> a
>> > >>> stab at
>> > >>> > addressing the issue.
>> > >>> >
>> > >>> > Let me file a ticket to track this issue. Otherwise, I'm looking
>> > >>> forward to
>> > >>> > hearing more thoughts from others as well, especially Lijie and
>> > Junrui
>> > >>> who
>> > >>> > have more context on the AdaptiveBatchScheduler.
>> > >>> >
>> > >>> > Regards
>> > >>> > Venkata krishnan
>> > >>> >
>> > >>> >
>> > >>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun <xingbe...@gmail.com>
>> > wrote:
>> > >>> >
>> > >>> > > Hi Venkat,
>> > >>> > > I agree that the parallelism of source vertex should not be
>> upper
>> > >>> bounded
>> > >>> > > by the job's global max parallelism. The case you mentioned, >>
>> > High
>> > >>> > filter
>> > >>> > > selectivity with huge amounts of data to read  excellently
>> supports
>> > >>> this
>> > >>> > > viewpoint. (In fact, in the current implementation, if the
>> source
>> > >>> > > parallelism is pre-specified at job create stage, rather than
>> > >>> relying on
>> > >>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler,
>> > the
>> > >>> > source
>> > >>> > > vertex's parallelism can indeed exceed the job's global max
>> > >>> parallelism.)
>> > >>> > >
>> > >>> > > As Lijie and Junrui pointed out, the key issue is "semantic
>> > >>> consistency."
>> > >>> > > Currently, if a vertex has not set maxParallelism, the
>> > >>> > > AdaptiveBatchScheduler will use
>> > >>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as
>> the
>> > >>> > vertex's
>> > >>> > > maxParallelism. Since the current implementation does not
>> > distinguish
>> > >>> > > between source vertices and downstream vertices, source vertices
>> > are
>> > >>> also
>> > >>> > > subject to this limitation.
>> > >>> > >
>> > >>> > > Therefore, I believe that if the issue of "semantic consistency"
>> > can
>> > >>> be
>> > >>> > > well explained in the code and configuration documentation, the
>> > >>> > > AdaptiveBatchScheduler should support that the parallelism of
>> > source
>> > >>> > > vertices can exceed the job's global max parallelism.
>> > >>> > >
>> > >>> > > Best,
>> > >>> > > Xia
>> > >>> > >
>> > >>> > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月14日周日
>> > 10:31写道:
>> > >>> > >
>> > >>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche*
>> > >>> > > > *duler.default-source-parallelism*" should not be bound by
>> the "
>> > >>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
>> > >>> > > >
>> > >>> > > >    - Source vertex is unique and does not have any upstream
>> > >>> vertices
>> > >>> > > >    - Downstream vertices read shuffled data partitioned by
>> key,
>> > >>> which
>> > >>> > is
>> > >>> > > >    not the case for the Source vertex
>> > >>> > > >    - Limiting source parallelism by downstream vertices' max
>> > >>> > parallelism
>> > >>> > > is
>> > >>> > > >    incorrect
>> > >>> > > >
>> > >>> > > > If we say for ""semantic consistency" the source vertex
>> > >>> parallelism has
>> > >>> > > to
>> > >>> > > > be bound by the overall job's max parallelism, it can lead to
>> > >>> following
>> > >>> > > > issues:
>> > >>> > > >
>> > >>> > > >    - High filter selectivity with huge amounts of data to
>> read -
>> > >>> > setting
>> > >>> > > >    high
>> "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
>> > so
>> > >>> that
>> > >>> > > >    source parallelism can be set higher can lead to small
>> blocks
>> > >>> and
>> > >>> > > >    sub-optimal performance.
>> > >>> > > >    - Setting high
>> > >>> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
>> > >>> > > >    requires careful tuning of network buffer configurations
>> which
>> > >>> is
>> > >>> > > >    unnecessary in cases where it is not required just so that
>> the
>> > >>> > source
>> > >>> > > >    parallelism can be set high.
>> > >>> > > >
>> > >>> > > > Regards
>> > >>> > > > Venkata krishnan
>> > >>> > > >
>> > >>> > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee <
>> jrlee....@gmail.com>
>> > >>> > wrote:
>> > >>> > > >
>> > >>> > > > > Hello Venkata krishnan,
>> > >>> > > > >
>> > >>> > > > > I think the term "semantic inconsistency" defined by
>> > >>> > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers
>> to
>> > >>> > > > maintaining a
>> > >>> > > > > uniform upper limit on parallelism across all vertices
>> within a
>> > >>> job.
>> > >>> > As
>> > >>> > > > the
>> > >>> > > > > source vertices are part of the global execution graph, they
>> > >>> should
>> > >>> > > also
>> > >>> > > > > respect this rule to ensure consistent application of
>> > parallelism
>> > >>> > > > > constraints.
>> > >>> > > > >
>> > >>> > > > > Best,
>> > >>> > > > > Junrui
>> > >>> > > > >
>> > >>> > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五
>> > >>> 02:10写道:
>> > >>> > > > >
>> > >>> > > > > > Gentle bump on this question. cc @Becket Qin <
>> > >>> becket....@gmail.com
>> > >>> > >
>> > >>> > > as
>> > >>> > > > > > well.
>> > >>> > > > > >
>> > >>> > > > > > Regards
>> > >>> > > > > > Venkata krishnan
>> > >>> > > > > >
>> > >>> > > > > >
>> > >>> > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan
>> Sowrirajan <
>> > >>> > > > > > vsowr...@asu.edu> wrote:
>> > >>> > > > > >
>> > >>> > > > > > > Thanks for the response Lijie and Junrui. Sorry for the
>> > late
>> > >>> > reply.
>> > >>> > > > Few
>> > >>> > > > > > > follow up questions.
>> > >>> > > > > > >
>> > >>> > > > > > > > Source can actually ignore this limit
>> > >>> > > > > > > because it has no upstream, but this will lead to
>> semantic
>> > >>> > > > > inconsistency.
>> > >>> > > > > > >
>> > >>> > > > > > > Lijie, can you please elaborate on the above comment
>> > further?
>> > >>> > What
>> > >>> > > do
>> > >>> > > > > you
>> > >>> > > > > > > mean when you say it will lead to "semantic
>> inconsistency"?
>> > >>> > > > > > >
>> > >>> > > > > > > > Secondly, we first need to limit the max parallelism
>> of
>> > >>> > > > (downstream)
>> > >>> > > > > > > vertex, and then we can decide how many subpartitions
>> > >>> (upstream
>> > >>> > > > vertex)
>> > >>> > > > > > > should produce. The limit should be effective, otherwise
>> > some
>> > >>> > > > > downstream
>> > >>> > > > > > > tasks will have no data to process.
>> > >>> > > > > > >
>> > >>> > > > > > > This makes sense in the context of any other vertices
>> other
>> > >>> than
>> > >>> > > the
>> > >>> > > > > > > source vertex. As you mentioned above ("Source can
>> actually
>> > >>> > ignore
>> > >>> > > > this
>> > >>> > > > > > > limit because it has no upstream"), therefore I feel "
>> > >>> > > > > > >
>> > >>> jobmanager.adaptive-batch-scheduler.default-source-parallelism"
>> > >>> > > need
>> > >>> > > > > not
>> > >>> > > > > > > be upper bounded by
>> > >>> > > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism".
>> > >>> > > > > > >
>> > >>> > > > > > > Regards
>> > >>> > > > > > > Venkata krishnan
>> > >>> > > > > > >
>> > >>> > > > > > >
>> > >>> > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee <
>> > >>> jrlee....@gmail.com>
>> > >>> > > > > wrote:
>> > >>> > > > > > >
>> > >>> > > > > > >> Hi Venkat,
>> > >>> > > > > > >>
>> > >>> > > > > > >> As Lijie mentioned,  in Flink, the parallelism is
>> required
>> > >>> to be
>> > >>> > > > less
>> > >>> > > > > > than
>> > >>> > > > > > >> or equal to the maximum parallelism. The config option
>> > >>> > > > > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and
>> > >>> > > > > > >>
>> > >>> jobmanager.adaptive-batch-scheduler.default-source-parallelism
>> > >>> > > will
>> > >>> > > > be
>> > >>> > > > > > set
>> > >>> > > > > > >> as the source's parallelism and max-parallelism,
>> > >>> respectively.
>> > >>> > > > > > Therefore,
>> > >>> > > > > > >> the check failed situation you encountered is in line
>> with
>> > >>> the
>> > >>> > > > > > >> expectations.
>> > >>> > > > > > >>
>> > >>> > > > > > >> Best,
>> > >>> > > > > > >> Junrui
>> > >>> > > > > > >>
>> > >>> > > > > > >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四
>> > >>> 17:35写道:
>> > >>> > > > > > >>
>> > >>> > > > > > >> > Hi Venkat,
>> > >>> > > > > > >> >
>> > >>> > > > > > >> > >> default-source-parallelism config should be
>> > independent
>> > >>> > from
>> > >>> > > > the
>> > >>> > > > > > >> > max-parallelism
>> > >>> > > > > > >> >
>> > >>> > > > > > >> > Actually, it's not.
>> > >>> > > > > > >> >
>> > >>> > > > > > >> > Firstly, it's obvious that the parallelism should be
>> > less
>> > >>> than
>> > >>> > > or
>> > >>> > > > > > equal
>> > >>> > > > > > >> to
>> > >>> > > > > > >> > the max parallelism(both literally and execution).
>> The
>> > >>> > > > > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism"
>> > >>> will be
>> > >>> > > used
>> > >>> > > > > as
>> > >>> > > > > > >> the
>> > >>> > > > > > >> > max parallelism for a vertex if you don't set max
>> > >>> parallelism
>> > >>> > > for
>> > >>> > > > it
>> > >>> > > > > > >> > individually (Just like the source in your case).
>> > >>> > > > > > >> >
>> > >>> > > > > > >> > Secondly, we first need to limit the max parallelism
>> of
>> > >>> > > > (downstream)
>> > >>> > > > > > >> > vertex, and then we can decide how many subpartitions
>> > >>> > (upstream
>> > >>> > > > > > vertex)
>> > >>> > > > > > >> > should produce. The limit should be effective,
>> otherwise
>> > >>> some
>> > >>> > > > > > downstream
>> > >>> > > > > > >> > tasks will have no data to process. Source can
>> actually
>> > >>> ignore
>> > >>> > > > this
>> > >>> > > > > > >> limit
>> > >>> > > > > > >> > because it has no upstream, but this will lead to
>> > semantic
>> > >>> > > > > > >> inconsistency.
>> > >>> > > > > > >> >
>> > >>> > > > > > >> > Best,
>> > >>> > > > > > >> > Lijie
>> > >>> > > > > > >> >
>> > >>> > > > > > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu>
>> > >>> 于2024年2月29日周四
>> > >>> > > > > 05:49写道:
>> > >>> > > > > > >> >
>> > >>> > > > > > >> > > Hi Flink devs,
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> > > With Flink's AdaptiveBatchScheduler
>> > >>> > > > > > >> > > <
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> >
>> > >>> > > > > > >>
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$
>> > >>> > > > > > >> > > >
>> > >>> > > > > > >> > > (Note:
>> > >>> > > > > > >> > > this is different from AdaptiveScheduler
>> > >>> > > > > > >> > > <
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> >
>> > >>> > > > > > >>
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$
>> > >>> > > > > > >> > > >),
>> > >>> > > > > > >> > > the scheduler automatically determines the correct
>> > >>> number of
>> > >>> > > > > > >> downstream
>> > >>> > > > > > >> > > tasks required to process the shuffle generated by
>> the
>> > >>> > > upstream
>> > >>> > > > > > >> vertex.
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> > > I have a question regarding the current behavior.
>> > There
>> > >>> are
>> > >>> > 2
>> > >>> > > > > > configs
>> > >>> > > > > > >> > which
>> > >>> > > > > > >> > > are in interplay here.
>> > >>> > > > > > >> > > 1.
>> > >>> > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism
>> > >>> > > > > > >> > > <
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> >
>> > >>> > > > > > >>
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
>> > >>> > > > > > >> > > >
>> > >>> > > > > > >> > >  - The default parallelism of data source.
>> > >>> > > > > > >> > > 2.
>> jobmanager.adaptive-batch-scheduler.max-parallelism
>> > >>> > > > > > >> > > <
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> >
>> > >>> > > > > > >>
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
>> > >>> > > > > > >> > > >
>> > >>> > > > > > >> > > -
>> > >>> > > > > > >> > > Upper bound of allowed parallelism to set
>> adaptively.
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> > > Currently, if "
>> > >>> > > > > > >> > >
>> > >>> > jobmanager.adaptive-batch-scheduler.default-source-parallelism
>> > >>> > > > > > >> > > <
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> >
>> > >>> > > > > > >>
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
>> > >>> > > > > > >> > > >"
>> > >>> > > > > > >> > > is greater than
>> > >>> > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism
>> > >>> > > > > > >> > > <
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> >
>> > >>> > > > > > >>
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
>> > >>> > > > > > >> > > >",
>> > >>> > > > > > >> > > Flink application fails with the below message:
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> > > "Vertex's parallelism should be smaller than or
>> equal
>> > to
>> > >>> > > > vertex's
>> > >>> > > > > > max
>> > >>> > > > > > >> > > parallelism."
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> > > This is the corresponding code in Flink's
>> > >>> > > > > > DefaultVertexParallelismInfo
>> > >>> > > > > > >> > > <
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> >
>> > >>> > > > > > >>
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> >
>> https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$
>> > >>> > > > > > >> > > >.
>> > >>> > > > > > >> > > My question is, "default-source-parallelism" config
>> > >>> should
>> > >>> > be
>> > >>> > > > > > >> independent
>> > >>> > > > > > >> > > from the "max-parallelism" flag. The former
>> controls
>> > the
>> > >>> > > default
>> > >>> > > > > > >> source
>> > >>> > > > > > >> > > parallelism while the latter controls the max
>> number
>> > of
>> > >>> > > > partitions
>> > >>> > > > > > to
>> > >>> > > > > > >> > write
>> > >>> > > > > > >> > > the intermediate shuffle.
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> > > If this is true, then the above check should be
>> fixed.
>> > >>> > > > Otherwise,
>> > >>> > > > > > >> wanted
>> > >>> > > > > > >> > to
>> > >>> > > > > > >> > > understand why the "default-source-parallelism`
>> should
>> > >>> be
>> > >>> > less
>> > >>> > > > > than
>> > >>> > > > > > >> the
>> > >>> > > > > > >> > > "max-parallelism"
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> > > Thanks
>> > >>> > > > > > >> > > Venkat
>> > >>> > > > > > >> > >
>> > >>> > > > > > >> >
>> > >>> > > > > > >>
>> > >>> > > > > > >
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> > >>
>> >
>>
>

Reply via email to