Jinrui and Xia

Gentle ping for reviews.

On Mon, Apr 29, 2024, 8:28 PM Venkatakrishnan Sowrirajan <vsowr...@asu.edu>
wrote:

> Hi Xia and Jinrui,
>
> Filed https://github.com/apache/flink/pull/24736 to address the above
> described issue. Please take a look whenever you can.
>
> Thanks
> Venkat
>
>
> On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan <
> vsowr...@asu.edu> wrote:
>
>> Filed https://issues.apache.org/jira/browse/FLINK-35165 to address the
>> above described issue. Will share the PR here once it is ready for review.
>>
>> Regards
>> Venkata krishnan
>>
>>
>> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee <jrlee....@gmail.com> wrote:
>>
>>> Thanks Venkata and Xia for providing further clarification. I think your
>>> example illustrates the significance of this proposal very well. Please
>>> feel free go ahead and address the concerns.
>>>
>>> Best,
>>> Junrui
>>>
>>> Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月16日周二 07:01写道:
>>>
>>> > Thanks for adding your thoughts to this discussion.
>>> >
>>> > If we all agree that the source vertex parallelism shouldn't be bound
>>> by
>>> > the downstream max parallelism
>>> > (jobmanager.adaptive-batch-scheduler.max-parallelism)
>>> > based on the rationale and the issues described above, I can take a
>>> stab at
>>> > addressing the issue.
>>> >
>>> > Let me file a ticket to track this issue. Otherwise, I'm looking
>>> forward to
>>> > hearing more thoughts from others as well, especially Lijie and Junrui
>>> who
>>> > have more context on the AdaptiveBatchScheduler.
>>> >
>>> > Regards
>>> > Venkata krishnan
>>> >
>>> >
>>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun <xingbe...@gmail.com> wrote:
>>> >
>>> > > Hi Venkat,
>>> > > I agree that the parallelism of source vertex should not be upper
>>> bounded
>>> > > by the job's global max parallelism. The case you mentioned, >> High
>>> > filter
>>> > > selectivity with huge amounts of data to read  excellently supports
>>> this
>>> > > viewpoint. (In fact, in the current implementation, if the source
>>> > > parallelism is pre-specified at job create stage, rather than
>>> relying on
>>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, the
>>> > source
>>> > > vertex's parallelism can indeed exceed the job's global max
>>> parallelism.)
>>> > >
>>> > > As Lijie and Junrui pointed out, the key issue is "semantic
>>> consistency."
>>> > > Currently, if a vertex has not set maxParallelism, the
>>> > > AdaptiveBatchScheduler will use
>>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the
>>> > vertex's
>>> > > maxParallelism. Since the current implementation does not distinguish
>>> > > between source vertices and downstream vertices, source vertices are
>>> also
>>> > > subject to this limitation.
>>> > >
>>> > > Therefore, I believe that if the issue of "semantic consistency" can
>>> be
>>> > > well explained in the code and configuration documentation, the
>>> > > AdaptiveBatchScheduler should support that the parallelism of source
>>> > > vertices can exceed the job's global max parallelism.
>>> > >
>>> > > Best,
>>> > > Xia
>>> > >
>>> > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月14日周日 10:31写道:
>>> > >
>>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche*
>>> > > > *duler.default-source-parallelism*" should not be bound by the "
>>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
>>> > > >
>>> > > >    - Source vertex is unique and does not have any upstream
>>> vertices
>>> > > >    - Downstream vertices read shuffled data partitioned by key,
>>> which
>>> > is
>>> > > >    not the case for the Source vertex
>>> > > >    - Limiting source parallelism by downstream vertices' max
>>> > parallelism
>>> > > is
>>> > > >    incorrect
>>> > > >
>>> > > > If we say for ""semantic consistency" the source vertex
>>> parallelism has
>>> > > to
>>> > > > be bound by the overall job's max parallelism, it can lead to
>>> following
>>> > > > issues:
>>> > > >
>>> > > >    - High filter selectivity with huge amounts of data to read -
>>> > setting
>>> > > >    high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so
>>> that
>>> > > >    source parallelism can be set higher can lead to small blocks
>>> and
>>> > > >    sub-optimal performance.
>>> > > >    - Setting high
>>> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
>>> > > >    requires careful tuning of network buffer configurations which
>>> is
>>> > > >    unnecessary in cases where it is not required just so that the
>>> > source
>>> > > >    parallelism can be set high.
>>> > > >
>>> > > > Regards
>>> > > > Venkata krishnan
>>> > > >
>>> > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee <jrlee....@gmail.com>
>>> > wrote:
>>> > > >
>>> > > > > Hello Venkata krishnan,
>>> > > > >
>>> > > > > I think the term "semantic inconsistency" defined by
>>> > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to
>>> > > > maintaining a
>>> > > > > uniform upper limit on parallelism across all vertices within a
>>> job.
>>> > As
>>> > > > the
>>> > > > > source vertices are part of the global execution graph, they
>>> should
>>> > > also
>>> > > > > respect this rule to ensure consistent application of parallelism
>>> > > > > constraints.
>>> > > > >
>>> > > > > Best,
>>> > > > > Junrui
>>> > > > >
>>> > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五
>>> 02:10写道:
>>> > > > >
>>> > > > > > Gentle bump on this question. cc @Becket Qin <
>>> becket....@gmail.com
>>> > >
>>> > > as
>>> > > > > > well.
>>> > > > > >
>>> > > > > > Regards
>>> > > > > > Venkata krishnan
>>> > > > > >
>>> > > > > >
>>> > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
>>> > > > > > vsowr...@asu.edu> wrote:
>>> > > > > >
>>> > > > > > > Thanks for the response Lijie and Junrui. Sorry for the late
>>> > reply.
>>> > > > Few
>>> > > > > > > follow up questions.
>>> > > > > > >
>>> > > > > > > > Source can actually ignore this limit
>>> > > > > > > because it has no upstream, but this will lead to semantic
>>> > > > > inconsistency.
>>> > > > > > >
>>> > > > > > > Lijie, can you please elaborate on the above comment further?
>>> > What
>>> > > do
>>> > > > > you
>>> > > > > > > mean when you say it will lead to "semantic inconsistency"?
>>> > > > > > >
>>> > > > > > > > Secondly, we first need to limit the max parallelism of
>>> > > > (downstream)
>>> > > > > > > vertex, and then we can decide how many subpartitions
>>> (upstream
>>> > > > vertex)
>>> > > > > > > should produce. The limit should be effective, otherwise some
>>> > > > > downstream
>>> > > > > > > tasks will have no data to process.
>>> > > > > > >
>>> > > > > > > This makes sense in the context of any other vertices other
>>> than
>>> > > the
>>> > > > > > > source vertex. As you mentioned above ("Source can actually
>>> > ignore
>>> > > > this
>>> > > > > > > limit because it has no upstream"), therefore I feel "
>>> > > > > > >
>>> jobmanager.adaptive-batch-scheduler.default-source-parallelism"
>>> > > need
>>> > > > > not
>>> > > > > > > be upper bounded by
>>> > > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism".
>>> > > > > > >
>>> > > > > > > Regards
>>> > > > > > > Venkata krishnan
>>> > > > > > >
>>> > > > > > >
>>> > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee <
>>> jrlee....@gmail.com>
>>> > > > > wrote:
>>> > > > > > >
>>> > > > > > >> Hi Venkat,
>>> > > > > > >>
>>> > > > > > >> As Lijie mentioned,  in Flink, the parallelism is required
>>> to be
>>> > > > less
>>> > > > > > than
>>> > > > > > >> or equal to the maximum parallelism. The config option
>>> > > > > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and
>>> > > > > > >>
>>> jobmanager.adaptive-batch-scheduler.default-source-parallelism
>>> > > will
>>> > > > be
>>> > > > > > set
>>> > > > > > >> as the source's parallelism and max-parallelism,
>>> respectively.
>>> > > > > > Therefore,
>>> > > > > > >> the check failed situation you encountered is in line with
>>> the
>>> > > > > > >> expectations.
>>> > > > > > >>
>>> > > > > > >> Best,
>>> > > > > > >> Junrui
>>> > > > > > >>
>>> > > > > > >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四
>>> 17:35写道:
>>> > > > > > >>
>>> > > > > > >> > Hi Venkat,
>>> > > > > > >> >
>>> > > > > > >> > >> default-source-parallelism config should be independent
>>> > from
>>> > > > the
>>> > > > > > >> > max-parallelism
>>> > > > > > >> >
>>> > > > > > >> > Actually, it's not.
>>> > > > > > >> >
>>> > > > > > >> > Firstly, it's obvious that the parallelism should be less
>>> than
>>> > > or
>>> > > > > > equal
>>> > > > > > >> to
>>> > > > > > >> > the max parallelism(both literally and execution). The
>>> > > > > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism"
>>> will be
>>> > > used
>>> > > > > as
>>> > > > > > >> the
>>> > > > > > >> > max parallelism for a vertex if you don't set max
>>> parallelism
>>> > > for
>>> > > > it
>>> > > > > > >> > individually (Just like the source in your case).
>>> > > > > > >> >
>>> > > > > > >> > Secondly, we first need to limit the max parallelism of
>>> > > > (downstream)
>>> > > > > > >> > vertex, and then we can decide how many subpartitions
>>> > (upstream
>>> > > > > > vertex)
>>> > > > > > >> > should produce. The limit should be effective, otherwise
>>> some
>>> > > > > > downstream
>>> > > > > > >> > tasks will have no data to process. Source can actually
>>> ignore
>>> > > > this
>>> > > > > > >> limit
>>> > > > > > >> > because it has no upstream, but this will lead to semantic
>>> > > > > > >> inconsistency.
>>> > > > > > >> >
>>> > > > > > >> > Best,
>>> > > > > > >> > Lijie
>>> > > > > > >> >
>>> > > > > > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu>
>>> 于2024年2月29日周四
>>> > > > > 05:49写道:
>>> > > > > > >> >
>>> > > > > > >> > > Hi Flink devs,
>>> > > > > > >> > >
>>> > > > > > >> > > With Flink's AdaptiveBatchScheduler
>>> > > > > > >> > > <
>>> > > > > > >> > >
>>> > > > > > >> >
>>> > > > > > >>
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$
>>> > > > > > >> > > >
>>> > > > > > >> > > (Note:
>>> > > > > > >> > > this is different from AdaptiveScheduler
>>> > > > > > >> > > <
>>> > > > > > >> > >
>>> > > > > > >> >
>>> > > > > > >>
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$
>>> > > > > > >> > > >),
>>> > > > > > >> > > the scheduler automatically determines the correct
>>> number of
>>> > > > > > >> downstream
>>> > > > > > >> > > tasks required to process the shuffle generated by the
>>> > > upstream
>>> > > > > > >> vertex.
>>> > > > > > >> > >
>>> > > > > > >> > > I have a question regarding the current behavior. There
>>> are
>>> > 2
>>> > > > > > configs
>>> > > > > > >> > which
>>> > > > > > >> > > are in interplay here.
>>> > > > > > >> > > 1.
>>> > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism
>>> > > > > > >> > > <
>>> > > > > > >> > >
>>> > > > > > >> >
>>> > > > > > >>
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
>>> > > > > > >> > > >
>>> > > > > > >> > >  - The default parallelism of data source.
>>> > > > > > >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism
>>> > > > > > >> > > <
>>> > > > > > >> > >
>>> > > > > > >> >
>>> > > > > > >>
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
>>> > > > > > >> > > >
>>> > > > > > >> > > -
>>> > > > > > >> > > Upper bound of allowed parallelism to set adaptively.
>>> > > > > > >> > >
>>> > > > > > >> > > Currently, if "
>>> > > > > > >> > >
>>> > jobmanager.adaptive-batch-scheduler.default-source-parallelism
>>> > > > > > >> > > <
>>> > > > > > >> > >
>>> > > > > > >> >
>>> > > > > > >>
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
>>> > > > > > >> > > >"
>>> > > > > > >> > > is greater than
>>> > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism
>>> > > > > > >> > > <
>>> > > > > > >> > >
>>> > > > > > >> >
>>> > > > > > >>
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
>>> > > > > > >> > > >",
>>> > > > > > >> > > Flink application fails with the below message:
>>> > > > > > >> > >
>>> > > > > > >> > > "Vertex's parallelism should be smaller than or equal to
>>> > > > vertex's
>>> > > > > > max
>>> > > > > > >> > > parallelism."
>>> > > > > > >> > >
>>> > > > > > >> > > This is the corresponding code in Flink's
>>> > > > > > DefaultVertexParallelismInfo
>>> > > > > > >> > > <
>>> > > > > > >> > >
>>> > > > > > >> >
>>> > > > > > >>
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$
>>> > > > > > >> > > >.
>>> > > > > > >> > > My question is, "default-source-parallelism" config
>>> should
>>> > be
>>> > > > > > >> independent
>>> > > > > > >> > > from the "max-parallelism" flag. The former controls the
>>> > > default
>>> > > > > > >> source
>>> > > > > > >> > > parallelism while the latter controls the max number of
>>> > > > partitions
>>> > > > > > to
>>> > > > > > >> > write
>>> > > > > > >> > > the intermediate shuffle.
>>> > > > > > >> > >
>>> > > > > > >> > > If this is true, then the above check should be fixed.
>>> > > > Otherwise,
>>> > > > > > >> wanted
>>> > > > > > >> > to
>>> > > > > > >> > > understand why the "default-source-parallelism` should
>>> be
>>> > less
>>> > > > > than
>>> > > > > > >> the
>>> > > > > > >> > > "max-parallelism"
>>> > > > > > >> > >
>>> > > > > > >> > > Thanks
>>> > > > > > >> > > Venkat
>>> > > > > > >> > >
>>> > > > > > >> >
>>> > > > > > >>
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to