Hi!

Sorry for the late reply, generally it's best to open a jira directly in
these cases.

You are completely right, I have tested this now and it seems to be
completely broken for 2.1 (works for 1.20).
I haven't dug very deep but your proposed fix is probably good. Can you
please open a JIRA/PR for this?

We should definitely add some test coverage that would catch the current
issue if it comes back.

Cheers
Gyula

On Wed, Nov 26, 2025 at 8:16 PM Barak Bar-On <[email protected]> wrote:

> Hi Flink developers,
>
> I believe I've found a potential bug (or possibly missing feature) in Flink
> 2.x where pipeline.jobvertex-parallelism-overrides appears to be ignored
> when running jobs in Application Mode. I wanted to discuss this with the
> community before filing a JIRA.
>
> This is affecting the Flink Kubernetes Operator autoscaler functionality in
> our environment.
>
> *The Problem*
> When using the Flink Kubernetes Operator with autoscaling enabled, the
> autoscaler correctly calculates per-vertex parallelism and writes it to
> pipeline.jobvertex-parallelism-overrides. However, the job always runs with
> uniform parallelism (parallelism.default) instead of the per-vertex
> overrides.
>
> For example, Kafka sources connected to topics with 24 partitions end up
> running with parallelism 280 (the default), wasting resources with 256 idle
> subtasks per source.
>
> *Analysis*
> I traced this to Dispatcher.java. In Flink 2.0, FLINK-36446 changed
> internalSubmitJob() to accept ExecutionPlan instead of JobGraph:
>
> private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan
> executionPlan) {
>     ...
>     if (executionPlan instanceof JobGraph) {
>         applyParallelismOverrides((JobGraph) executionPlan);
>     }
>     ...
> }
>
> In Application Mode, Flink submits a StreamGraph (not JobGraph) to the
> Dispatcher. Since executionPlan instanceof JobGraph is false, the
> parallelism overrides are skipped entirely.
>
> The StreamGraph is later converted to JobGraph in
> DefaultSchedulerFactory.createScheduler():
>
> } else if (executionPlan instanceof StreamGraph) {
>     jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader);
> }
>
> But by this point, the override logic has already been bypassed and there's
> no second check.
>
> *Evidence*
> From JobManager logs:
>
>    - Config IS loaded: Loading configuration property:
>    pipeline.jobvertex-parallelism-overrides, ...
>    - Log shows: Added StreamGraph(jobId: xxx) - confirming StreamGraph is
>    used
>    - The message "Changing job vertex {} parallelism from {} to {}" does
>    NOT appear
>    - All tasks run with uniform parallelism instead of configured overrides
>
> *Versions Checked*
> I checked all 2.x branches and the same code pattern exists in:
>
>    - release-2.0
>    - release-2.1
>    - release-2.2
>    - master
>
> *Proposed Fix*
> Apply parallelism overrides in DefaultSchedulerFactory right after the
> StreamGraph → JobGraph conversion:
>
> } else if (executionPlan instanceof StreamGraph) {
>     jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader);
>
>     // Apply parallelism overrides to the converted JobGraph
>     Map<String, String> overrides = new HashMap<>();
>
> overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
>
> overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));
>
>     for (JobVertex vertex : jobGraph.getVertices()) {
>         String override = overrides.get(vertex.getID().toHexString());
>         if (override != null) {
>             vertex.setParallelism(Integer.parseInt(override));
>         }
>     }
> }
>
> This location works well because the JobGraph already has correct vertex
> IDs and configuration is available.
>
> *Questions*
>
>    1. Is this the expected behavior, or is this a bug/oversight from the
>    FLINK-36446 refactoring?
>    2. Was pipeline.jobvertex-parallelism-overrides ever intended to work
>    with StreamGraph submissions in Application Mode?
>    3. If this is indeed a bug, is the proposed fix location (
>    DefaultSchedulerFactory) appropriate, or would a different approach be
>    preferred?
>    4. Should I create a JIRA issue for this?
>
> I'm happy to provide more details or contribute a fix if the community
> confirms this is a bug.
>
> Thanks.
>

Reply via email to