Hi! Sorry for the late reply, generally it's best to open a jira directly in these cases.
You are completely right, I have tested this now and it seems to be completely broken for 2.1 (works for 1.20). I haven't dug very deep but your proposed fix is probably good. Can you please open a JIRA/PR for this? We should definitely add some test coverage that would catch the current issue if it comes back. Cheers Gyula On Wed, Nov 26, 2025 at 8:16 PM Barak Bar-On <[email protected]> wrote: > Hi Flink developers, > > I believe I've found a potential bug (or possibly missing feature) in Flink > 2.x where pipeline.jobvertex-parallelism-overrides appears to be ignored > when running jobs in Application Mode. I wanted to discuss this with the > community before filing a JIRA. > > This is affecting the Flink Kubernetes Operator autoscaler functionality in > our environment. > > *The Problem* > When using the Flink Kubernetes Operator with autoscaling enabled, the > autoscaler correctly calculates per-vertex parallelism and writes it to > pipeline.jobvertex-parallelism-overrides. However, the job always runs with > uniform parallelism (parallelism.default) instead of the per-vertex > overrides. > > For example, Kafka sources connected to topics with 24 partitions end up > running with parallelism 280 (the default), wasting resources with 256 idle > subtasks per source. > > *Analysis* > I traced this to Dispatcher.java. In Flink 2.0, FLINK-36446 changed > internalSubmitJob() to accept ExecutionPlan instead of JobGraph: > > private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan > executionPlan) { > ... > if (executionPlan instanceof JobGraph) { > applyParallelismOverrides((JobGraph) executionPlan); > } > ... > } > > In Application Mode, Flink submits a StreamGraph (not JobGraph) to the > Dispatcher. Since executionPlan instanceof JobGraph is false, the > parallelism overrides are skipped entirely. > > The StreamGraph is later converted to JobGraph in > DefaultSchedulerFactory.createScheduler(): > > } else if (executionPlan instanceof StreamGraph) { > jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader); > } > > But by this point, the override logic has already been bypassed and there's > no second check. > > *Evidence* > From JobManager logs: > > - Config IS loaded: Loading configuration property: > pipeline.jobvertex-parallelism-overrides, ... > - Log shows: Added StreamGraph(jobId: xxx) - confirming StreamGraph is > used > - The message "Changing job vertex {} parallelism from {} to {}" does > NOT appear > - All tasks run with uniform parallelism instead of configured overrides > > *Versions Checked* > I checked all 2.x branches and the same code pattern exists in: > > - release-2.0 > - release-2.1 > - release-2.2 > - master > > *Proposed Fix* > Apply parallelism overrides in DefaultSchedulerFactory right after the > StreamGraph → JobGraph conversion: > > } else if (executionPlan instanceof StreamGraph) { > jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader); > > // Apply parallelism overrides to the converted JobGraph > Map<String, String> overrides = new HashMap<>(); > > overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES)); > > overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES)); > > for (JobVertex vertex : jobGraph.getVertices()) { > String override = overrides.get(vertex.getID().toHexString()); > if (override != null) { > vertex.setParallelism(Integer.parseInt(override)); > } > } > } > > This location works well because the JobGraph already has correct vertex > IDs and configuration is available. > > *Questions* > > 1. Is this the expected behavior, or is this a bug/oversight from the > FLINK-36446 refactoring? > 2. Was pipeline.jobvertex-parallelism-overrides ever intended to work > with StreamGraph submissions in Application Mode? > 3. If this is indeed a bug, is the proposed fix location ( > DefaultSchedulerFactory) appropriate, or would a different approach be > preferred? > 4. Should I create a JIRA issue for this? > > I'm happy to provide more details or contribute a fix if the community > confirms this is a bug. > > Thanks. >
