On a second thought, DefaultSchedulerFactory is probably not the right/only
place to fix this because there are other scheduler factories as well
(adaptive scheduler)

On Tue, Dec 2, 2025 at 8:47 AM Gyula Fóra <[email protected]> wrote:

> Hi!
>
> Sorry for the late reply, generally it's best to open a jira directly in
> these cases.
>
> You are completely right, I have tested this now and it seems to be
> completely broken for 2.1 (works for 1.20).
> I haven't dug very deep but your proposed fix is probably good. Can you
> please open a JIRA/PR for this?
>
> We should definitely add some test coverage that would catch the current
> issue if it comes back.
>
> Cheers
> Gyula
>
> On Wed, Nov 26, 2025 at 8:16 PM Barak Bar-On <[email protected]>
> wrote:
>
>> Hi Flink developers,
>>
>> I believe I've found a potential bug (or possibly missing feature) in
>> Flink
>> 2.x where pipeline.jobvertex-parallelism-overrides appears to be ignored
>> when running jobs in Application Mode. I wanted to discuss this with the
>> community before filing a JIRA.
>>
>> This is affecting the Flink Kubernetes Operator autoscaler functionality
>> in
>> our environment.
>>
>> *The Problem*
>> When using the Flink Kubernetes Operator with autoscaling enabled, the
>> autoscaler correctly calculates per-vertex parallelism and writes it to
>> pipeline.jobvertex-parallelism-overrides. However, the job always runs
>> with
>> uniform parallelism (parallelism.default) instead of the per-vertex
>> overrides.
>>
>> For example, Kafka sources connected to topics with 24 partitions end up
>> running with parallelism 280 (the default), wasting resources with 256
>> idle
>> subtasks per source.
>>
>> *Analysis*
>> I traced this to Dispatcher.java. In Flink 2.0, FLINK-36446 changed
>> internalSubmitJob() to accept ExecutionPlan instead of JobGraph:
>>
>> private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan
>> executionPlan) {
>>     ...
>>     if (executionPlan instanceof JobGraph) {
>>         applyParallelismOverrides((JobGraph) executionPlan);
>>     }
>>     ...
>> }
>>
>> In Application Mode, Flink submits a StreamGraph (not JobGraph) to the
>> Dispatcher. Since executionPlan instanceof JobGraph is false, the
>> parallelism overrides are skipped entirely.
>>
>> The StreamGraph is later converted to JobGraph in
>> DefaultSchedulerFactory.createScheduler():
>>
>> } else if (executionPlan instanceof StreamGraph) {
>>     jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader);
>> }
>>
>> But by this point, the override logic has already been bypassed and
>> there's
>> no second check.
>>
>> *Evidence*
>> From JobManager logs:
>>
>>    - Config IS loaded: Loading configuration property:
>>    pipeline.jobvertex-parallelism-overrides, ...
>>    - Log shows: Added StreamGraph(jobId: xxx) - confirming StreamGraph is
>>    used
>>    - The message "Changing job vertex {} parallelism from {} to {}" does
>>    NOT appear
>>    - All tasks run with uniform parallelism instead of configured
>> overrides
>>
>> *Versions Checked*
>> I checked all 2.x branches and the same code pattern exists in:
>>
>>    - release-2.0
>>    - release-2.1
>>    - release-2.2
>>    - master
>>
>> *Proposed Fix*
>> Apply parallelism overrides in DefaultSchedulerFactory right after the
>> StreamGraph → JobGraph conversion:
>>
>> } else if (executionPlan instanceof StreamGraph) {
>>     jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader);
>>
>>     // Apply parallelism overrides to the converted JobGraph
>>     Map<String, String> overrides = new HashMap<>();
>>
>> overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
>>
>> overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));
>>
>>     for (JobVertex vertex : jobGraph.getVertices()) {
>>         String override = overrides.get(vertex.getID().toHexString());
>>         if (override != null) {
>>             vertex.setParallelism(Integer.parseInt(override));
>>         }
>>     }
>> }
>>
>> This location works well because the JobGraph already has correct vertex
>> IDs and configuration is available.
>>
>> *Questions*
>>
>>    1. Is this the expected behavior, or is this a bug/oversight from the
>>    FLINK-36446 refactoring?
>>    2. Was pipeline.jobvertex-parallelism-overrides ever intended to work
>>    with StreamGraph submissions in Application Mode?
>>    3. If this is indeed a bug, is the proposed fix location (
>>    DefaultSchedulerFactory) appropriate, or would a different approach be
>>    preferred?
>>    4. Should I create a JIRA issue for this?
>>
>> I'm happy to provide more details or contribute a fix if the community
>> confirms this is a bug.
>>
>> Thanks.
>>
>

Reply via email to