Hi Venkat, As Lijie mentioned, in Flink, the parallelism is required to be less than or equal to the maximum parallelism. The config option jobmanager.adaptive-batch-scheduler.max-parallelism and jobmanager.adaptive-batch-scheduler.default-source-parallelism will be set as the source's parallelism and max-parallelism, respectively. Therefore, the check failed situation you encountered is in line with the expectations.
Best, Junrui Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 17:35写道: > Hi Venkat, > > >> default-source-parallelism config should be independent from the > max-parallelism > > Actually, it's not. > > Firstly, it's obvious that the parallelism should be less than or equal to > the max parallelism(both literally and execution). The > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as the > max parallelism for a vertex if you don't set max parallelism for it > individually (Just like the source in your case). > > Secondly, we first need to limit the max parallelism of (downstream) > vertex, and then we can decide how many subpartitions (upstream vertex) > should produce. The limit should be effective, otherwise some downstream > tasks will have no data to process. Source can actually ignore this limit > because it has no upstream, but this will lead to semantic inconsistency. > > Best, > Lijie > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年2月29日周四 05:49写道: > > > Hi Flink devs, > > > > With Flink's AdaptiveBatchScheduler > > < > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-batch-scheduler > > > > > (Note: > > this is different from AdaptiveScheduler > > < > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-scheduler > > >), > > the scheduler automatically determines the correct number of downstream > > tasks required to process the shuffle generated by the upstream vertex. > > > > I have a question regarding the current behavior. There are 2 configs > which > > are in interplay here. > > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism > > < > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism > > > > > - The default parallelism of data source. > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism > > < > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism > > > > > - > > Upper bound of allowed parallelism to set adaptively. > > > > Currently, if " > > jobmanager.adaptive-batch-scheduler.default-source-parallelism > > < > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism > > >" > > is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism > > < > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism > > >", > > Flink application fails with the below message: > > > > "Vertex's parallelism should be smaller than or equal to vertex's max > > parallelism." > > > > This is the corresponding code in Flink's DefaultVertexParallelismInfo > > < > > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java#L110 > > >. > > My question is, "default-source-parallelism" config should be independent > > from the "max-parallelism" flag. The former controls the default source > > parallelism while the latter controls the max number of partitions to > write > > the intermediate shuffle. > > > > If this is true, then the above check should be fixed. Otherwise, wanted > to > > understand why the "default-source-parallelism` should be less than the > > "max-parallelism" > > > > Thanks > > Venkat > > >