Hi Venkat,

As Lijie mentioned,  in Flink, the parallelism is required to be less than
or equal to the maximum parallelism. The config option
jobmanager.adaptive-batch-scheduler.max-parallelism and
jobmanager.adaptive-batch-scheduler.default-source-parallelism will be set
as the source's parallelism and max-parallelism, respectively. Therefore,
the check failed situation you encountered is in line with the expectations.

Best,
Junrui

Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 17:35写道:

> Hi Venkat,
>
> >> default-source-parallelism config should be independent from the
> max-parallelism
>
> Actually, it's not.
>
> Firstly, it's obvious that the parallelism should be less than or equal to
> the max parallelism(both literally and execution). The
> "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as the
> max parallelism for a vertex if you don't set max parallelism for it
> individually (Just like the source in your case).
>
> Secondly, we first need to limit the max parallelism of (downstream)
> vertex, and then we can decide how many subpartitions (upstream vertex)
> should produce. The limit should be effective, otherwise some downstream
> tasks will have no data to process. Source can actually ignore this limit
> because it has no upstream, but this will lead to semantic inconsistency.
>
> Best,
> Lijie
>
> Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年2月29日周四 05:49写道:
>
> > Hi Flink devs,
> >
> > With Flink's AdaptiveBatchScheduler
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-batch-scheduler
> > >
> > (Note:
> > this is different from AdaptiveScheduler
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-scheduler
> > >),
> > the scheduler automatically determines the correct number of downstream
> > tasks required to process the shuffle generated by the upstream vertex.
> >
> > I have a question regarding the current behavior. There are 2 configs
> which
> > are in interplay here.
> > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism
> > >
> >  - The default parallelism of data source.
> > 2. jobmanager.adaptive-batch-scheduler.max-parallelism
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism
> > >
> > -
> > Upper bound of allowed parallelism to set adaptively.
> >
> > Currently, if "
> > jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism
> > >"
> > is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism
> > >",
> > Flink application fails with the below message:
> >
> > "Vertex's parallelism should be smaller than or equal to vertex's max
> > parallelism."
> >
> > This is the corresponding code in Flink's DefaultVertexParallelismInfo
> > <
> >
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java#L110
> > >.
> > My question is, "default-source-parallelism" config should be independent
> > from the "max-parallelism" flag. The former controls the default source
> > parallelism while the latter controls the max number of partitions to
> write
> > the intermediate shuffle.
> >
> > If this is true, then the above check should be fixed. Otherwise, wanted
> to
> > understand why the "default-source-parallelism` should be less than the
> > "max-parallelism"
> >
> > Thanks
> > Venkat
> >
>

Reply via email to