Xia,

Thanks for the reviews. Unfortunately due to work commitments I am little
delayed in addressing your review comments. Mostly will be done by end of
this week. Just a quick heads up.

Jinrui,

Thanks, that would be great.

On Mon, May 6, 2024, 12:45 AM Junrui Lee <jrlee....@gmail.com> wrote:

> Hi,
> Thanks for the reminder. I will review it soon during my free time.
>
> Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年5月4日周六 10:10写道:
>
> > Jinrui and Xia
> >
> > Gentle ping for reviews.
> >
> > On Mon, Apr 29, 2024, 8:28 PM Venkatakrishnan Sowrirajan <
> vsowr...@asu.edu
> > >
> > wrote:
> >
> > > Hi Xia and Jinrui,
> > >
> > > Filed
> https://urldefense.com/v3/__https://github.com/apache/flink/pull/24736__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbictUrQLQ$
> to address the above
> > > described issue. Please take a look whenever you can.
> > >
> > > Thanks
> > > Venkat
> > >
> > >
> > > On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan <
> > > vsowr...@asu.edu> wrote:
> > >
> > >> Filed
> https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLINK-35165__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbim4QLkBQ$
> to address the
> > >> above described issue. Will share the PR here once it is ready for
> > review.
> > >>
> > >> Regards
> > >> Venkata krishnan
> > >>
> > >>
> > >> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee <jrlee....@gmail.com>
> wrote:
> > >>
> > >>> Thanks Venkata and Xia for providing further clarification. I think
> > your
> > >>> example illustrates the significance of this proposal very well.
> Please
> > >>> feel free go ahead and address the concerns.
> > >>>
> > >>> Best,
> > >>> Junrui
> > >>>
> > >>> Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月16日周二 07:01写道:
> > >>>
> > >>> > Thanks for adding your thoughts to this discussion.
> > >>> >
> > >>> > If we all agree that the source vertex parallelism shouldn't be
> bound
> > >>> by
> > >>> > the downstream max parallelism
> > >>> > (jobmanager.adaptive-batch-scheduler.max-parallelism)
> > >>> > based on the rationale and the issues described above, I can take a
> > >>> stab at
> > >>> > addressing the issue.
> > >>> >
> > >>> > Let me file a ticket to track this issue. Otherwise, I'm looking
> > >>> forward to
> > >>> > hearing more thoughts from others as well, especially Lijie and
> > Junrui
> > >>> who
> > >>> > have more context on the AdaptiveBatchScheduler.
> > >>> >
> > >>> > Regards
> > >>> > Venkata krishnan
> > >>> >
> > >>> >
> > >>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun <xingbe...@gmail.com>
> > wrote:
> > >>> >
> > >>> > > Hi Venkat,
> > >>> > > I agree that the parallelism of source vertex should not be upper
> > >>> bounded
> > >>> > > by the job's global max parallelism. The case you mentioned, >>
> > High
> > >>> > filter
> > >>> > > selectivity with huge amounts of data to read  excellently
> supports
> > >>> this
> > >>> > > viewpoint. (In fact, in the current implementation, if the source
> > >>> > > parallelism is pre-specified at job create stage, rather than
> > >>> relying on
> > >>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler,
> > the
> > >>> > source
> > >>> > > vertex's parallelism can indeed exceed the job's global max
> > >>> parallelism.)
> > >>> > >
> > >>> > > As Lijie and Junrui pointed out, the key issue is "semantic
> > >>> consistency."
> > >>> > > Currently, if a vertex has not set maxParallelism, the
> > >>> > > AdaptiveBatchScheduler will use
> > >>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as
> the
> > >>> > vertex's
> > >>> > > maxParallelism. Since the current implementation does not
> > distinguish
> > >>> > > between source vertices and downstream vertices, source vertices
> > are
> > >>> also
> > >>> > > subject to this limitation.
> > >>> > >
> > >>> > > Therefore, I believe that if the issue of "semantic consistency"
> > can
> > >>> be
> > >>> > > well explained in the code and configuration documentation, the
> > >>> > > AdaptiveBatchScheduler should support that the parallelism of
> > source
> > >>> > > vertices can exceed the job's global max parallelism.
> > >>> > >
> > >>> > > Best,
> > >>> > > Xia
> > >>> > >
> > >>> > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月14日周日
> > 10:31写道:
> > >>> > >
> > >>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche*
> > >>> > > > *duler.default-source-parallelism*" should not be bound by the
> "
> > >>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
> > >>> > > >
> > >>> > > >    - Source vertex is unique and does not have any upstream
> > >>> vertices
> > >>> > > >    - Downstream vertices read shuffled data partitioned by key,
> > >>> which
> > >>> > is
> > >>> > > >    not the case for the Source vertex
> > >>> > > >    - Limiting source parallelism by downstream vertices' max
> > >>> > parallelism
> > >>> > > is
> > >>> > > >    incorrect
> > >>> > > >
> > >>> > > > If we say for ""semantic consistency" the source vertex
> > >>> parallelism has
> > >>> > > to
> > >>> > > > be bound by the overall job's max parallelism, it can lead to
> > >>> following
> > >>> > > > issues:
> > >>> > > >
> > >>> > > >    - High filter selectivity with huge amounts of data to read
> -
> > >>> > setting
> > >>> > > >    high "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
> > so
> > >>> that
> > >>> > > >    source parallelism can be set higher can lead to small
> blocks
> > >>> and
> > >>> > > >    sub-optimal performance.
> > >>> > > >    - Setting high
> > >>> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
> > >>> > > >    requires careful tuning of network buffer configurations
> which
> > >>> is
> > >>> > > >    unnecessary in cases where it is not required just so that
> the
> > >>> > source
> > >>> > > >    parallelism can be set high.
> > >>> > > >
> > >>> > > > Regards
> > >>> > > > Venkata krishnan
> > >>> > > >
> > >>> > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee <
> jrlee....@gmail.com>
> > >>> > wrote:
> > >>> > > >
> > >>> > > > > Hello Venkata krishnan,
> > >>> > > > >
> > >>> > > > > I think the term "semantic inconsistency" defined by
> > >>> > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to
> > >>> > > > maintaining a
> > >>> > > > > uniform upper limit on parallelism across all vertices
> within a
> > >>> job.
> > >>> > As
> > >>> > > > the
> > >>> > > > > source vertices are part of the global execution graph, they
> > >>> should
> > >>> > > also
> > >>> > > > > respect this rule to ensure consistent application of
> > parallelism
> > >>> > > > > constraints.
> > >>> > > > >
> > >>> > > > > Best,
> > >>> > > > > Junrui
> > >>> > > > >
> > >>> > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五
> > >>> 02:10写道:
> > >>> > > > >
> > >>> > > > > > Gentle bump on this question. cc @Becket Qin <
> > >>> becket....@gmail.com
> > >>> > >
> > >>> > > as
> > >>> > > > > > well.
> > >>> > > > > >
> > >>> > > > > > Regards
> > >>> > > > > > Venkata krishnan
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan
> Sowrirajan <
> > >>> > > > > > vsowr...@asu.edu> wrote:
> > >>> > > > > >
> > >>> > > > > > > Thanks for the response Lijie and Junrui. Sorry for the
> > late
> > >>> > reply.
> > >>> > > > Few
> > >>> > > > > > > follow up questions.
> > >>> > > > > > >
> > >>> > > > > > > > Source can actually ignore this limit
> > >>> > > > > > > because it has no upstream, but this will lead to
> semantic
> > >>> > > > > inconsistency.
> > >>> > > > > > >
> > >>> > > > > > > Lijie, can you please elaborate on the above comment
> > further?
> > >>> > What
> > >>> > > do
> > >>> > > > > you
> > >>> > > > > > > mean when you say it will lead to "semantic
> inconsistency"?
> > >>> > > > > > >
> > >>> > > > > > > > Secondly, we first need to limit the max parallelism of
> > >>> > > > (downstream)
> > >>> > > > > > > vertex, and then we can decide how many subpartitions
> > >>> (upstream
> > >>> > > > vertex)
> > >>> > > > > > > should produce. The limit should be effective, otherwise
> > some
> > >>> > > > > downstream
> > >>> > > > > > > tasks will have no data to process.
> > >>> > > > > > >
> > >>> > > > > > > This makes sense in the context of any other vertices
> other
> > >>> than
> > >>> > > the
> > >>> > > > > > > source vertex. As you mentioned above ("Source can
> actually
> > >>> > ignore
> > >>> > > > this
> > >>> > > > > > > limit because it has no upstream"), therefore I feel "
> > >>> > > > > > >
> > >>> jobmanager.adaptive-batch-scheduler.default-source-parallelism"
> > >>> > > need
> > >>> > > > > not
> > >>> > > > > > > be upper bounded by
> > >>> > > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism".
> > >>> > > > > > >
> > >>> > > > > > > Regards
> > >>> > > > > > > Venkata krishnan
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee <
> > >>> jrlee....@gmail.com>
> > >>> > > > > wrote:
> > >>> > > > > > >
> > >>> > > > > > >> Hi Venkat,
> > >>> > > > > > >>
> > >>> > > > > > >> As Lijie mentioned,  in Flink, the parallelism is
> required
> > >>> to be
> > >>> > > > less
> > >>> > > > > > than
> > >>> > > > > > >> or equal to the maximum parallelism. The config option
> > >>> > > > > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and
> > >>> > > > > > >>
> > >>> jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > >>> > > will
> > >>> > > > be
> > >>> > > > > > set
> > >>> > > > > > >> as the source's parallelism and max-parallelism,
> > >>> respectively.
> > >>> > > > > > Therefore,
> > >>> > > > > > >> the check failed situation you encountered is in line
> with
> > >>> the
> > >>> > > > > > >> expectations.
> > >>> > > > > > >>
> > >>> > > > > > >> Best,
> > >>> > > > > > >> Junrui
> > >>> > > > > > >>
> > >>> > > > > > >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四
> > >>> 17:35写道:
> > >>> > > > > > >>
> > >>> > > > > > >> > Hi Venkat,
> > >>> > > > > > >> >
> > >>> > > > > > >> > >> default-source-parallelism config should be
> > independent
> > >>> > from
> > >>> > > > the
> > >>> > > > > > >> > max-parallelism
> > >>> > > > > > >> >
> > >>> > > > > > >> > Actually, it's not.
> > >>> > > > > > >> >
> > >>> > > > > > >> > Firstly, it's obvious that the parallelism should be
> > less
> > >>> than
> > >>> > > or
> > >>> > > > > > equal
> > >>> > > > > > >> to
> > >>> > > > > > >> > the max parallelism(both literally and execution). The
> > >>> > > > > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism"
> > >>> will be
> > >>> > > used
> > >>> > > > > as
> > >>> > > > > > >> the
> > >>> > > > > > >> > max parallelism for a vertex if you don't set max
> > >>> parallelism
> > >>> > > for
> > >>> > > > it
> > >>> > > > > > >> > individually (Just like the source in your case).
> > >>> > > > > > >> >
> > >>> > > > > > >> > Secondly, we first need to limit the max parallelism
> of
> > >>> > > > (downstream)
> > >>> > > > > > >> > vertex, and then we can decide how many subpartitions
> > >>> > (upstream
> > >>> > > > > > vertex)
> > >>> > > > > > >> > should produce. The limit should be effective,
> otherwise
> > >>> some
> > >>> > > > > > downstream
> > >>> > > > > > >> > tasks will have no data to process. Source can
> actually
> > >>> ignore
> > >>> > > > this
> > >>> > > > > > >> limit
> > >>> > > > > > >> > because it has no upstream, but this will lead to
> > semantic
> > >>> > > > > > >> inconsistency.
> > >>> > > > > > >> >
> > >>> > > > > > >> > Best,
> > >>> > > > > > >> > Lijie
> > >>> > > > > > >> >
> > >>> > > > > > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu>
> > >>> 于2024年2月29日周四
> > >>> > > > > 05:49写道:
> > >>> > > > > > >> >
> > >>> > > > > > >> > > Hi Flink devs,
> > >>> > > > > > >> > >
> > >>> > > > > > >> > > With Flink's AdaptiveBatchScheduler
> > >>> > > > > > >> > > <
> > >>> > > > > > >> > >
> > >>> > > > > > >> >
> > >>> > > > > > >>
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$
> > >>> > > > > > >> > > >
> > >>> > > > > > >> > > (Note:
> > >>> > > > > > >> > > this is different from AdaptiveScheduler
> > >>> > > > > > >> > > <
> > >>> > > > > > >> > >
> > >>> > > > > > >> >
> > >>> > > > > > >>
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$
> > >>> > > > > > >> > > >),
> > >>> > > > > > >> > > the scheduler automatically determines the correct
> > >>> number of
> > >>> > > > > > >> downstream
> > >>> > > > > > >> > > tasks required to process the shuffle generated by
> the
> > >>> > > upstream
> > >>> > > > > > >> vertex.
> > >>> > > > > > >> > >
> > >>> > > > > > >> > > I have a question regarding the current behavior.
> > There
> > >>> are
> > >>> > 2
> > >>> > > > > > configs
> > >>> > > > > > >> > which
> > >>> > > > > > >> > > are in interplay here.
> > >>> > > > > > >> > > 1.
> > >>> > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > >>> > > > > > >> > > <
> > >>> > > > > > >> > >
> > >>> > > > > > >> >
> > >>> > > > > > >>
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
> > >>> > > > > > >> > > >
> > >>> > > > > > >> > >  - The default parallelism of data source.
> > >>> > > > > > >> > > 2.
> jobmanager.adaptive-batch-scheduler.max-parallelism
> > >>> > > > > > >> > > <
> > >>> > > > > > >> > >
> > >>> > > > > > >> >
> > >>> > > > > > >>
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
> > >>> > > > > > >> > > >
> > >>> > > > > > >> > > -
> > >>> > > > > > >> > > Upper bound of allowed parallelism to set
> adaptively.
> > >>> > > > > > >> > >
> > >>> > > > > > >> > > Currently, if "
> > >>> > > > > > >> > >
> > >>> > jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > >>> > > > > > >> > > <
> > >>> > > > > > >> > >
> > >>> > > > > > >> >
> > >>> > > > > > >>
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
> > >>> > > > > > >> > > >"
> > >>> > > > > > >> > > is greater than
> > >>> > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism
> > >>> > > > > > >> > > <
> > >>> > > > > > >> > >
> > >>> > > > > > >> >
> > >>> > > > > > >>
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
> > >>> > > > > > >> > > >",
> > >>> > > > > > >> > > Flink application fails with the below message:
> > >>> > > > > > >> > >
> > >>> > > > > > >> > > "Vertex's parallelism should be smaller than or
> equal
> > to
> > >>> > > > vertex's
> > >>> > > > > > max
> > >>> > > > > > >> > > parallelism."
> > >>> > > > > > >> > >
> > >>> > > > > > >> > > This is the corresponding code in Flink's
> > >>> > > > > > DefaultVertexParallelismInfo
> > >>> > > > > > >> > > <
> > >>> > > > > > >> > >
> > >>> > > > > > >> >
> > >>> > > > > > >>
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$
> > >>> > > > > > >> > > >.
> > >>> > > > > > >> > > My question is, "default-source-parallelism" config
> > >>> should
> > >>> > be
> > >>> > > > > > >> independent
> > >>> > > > > > >> > > from the "max-parallelism" flag. The former controls
> > the
> > >>> > > default
> > >>> > > > > > >> source
> > >>> > > > > > >> > > parallelism while the latter controls the max number
> > of
> > >>> > > > partitions
> > >>> > > > > > to
> > >>> > > > > > >> > write
> > >>> > > > > > >> > > the intermediate shuffle.
> > >>> > > > > > >> > >
> > >>> > > > > > >> > > If this is true, then the above check should be
> fixed.
> > >>> > > > Otherwise,
> > >>> > > > > > >> wanted
> > >>> > > > > > >> > to
> > >>> > > > > > >> > > understand why the "default-source-parallelism`
> should
> > >>> be
> > >>> > less
> > >>> > > > > than
> > >>> > > > > > >> the
> > >>> > > > > > >> > > "max-parallelism"
> > >>> > > > > > >> > >
> > >>> > > > > > >> > > Thanks
> > >>> > > > > > >> > > Venkat
> > >>> > > > > > >> > >
> > >>> > > > > > >> >
> > >>> > > > > > >>
> > >>> > > > > > >
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> >
>

Reply via email to