Hi Flink developers,
I believe I've found a potential bug (or possibly missing feature) in Flink
2.x where pipeline.jobvertex-parallelism-overrides appears to be ignored
when running jobs in Application Mode. I wanted to discuss this with the
community before filing a JIRA.
This is affecting the Flink Kubernetes Operator autoscaler functionality in
our environment.
*The Problem*
When using the Flink Kubernetes Operator with autoscaling enabled, the
autoscaler correctly calculates per-vertex parallelism and writes it to
pipeline.jobvertex-parallelism-overrides. However, the job always runs with
uniform parallelism (parallelism.default) instead of the per-vertex
overrides.
For example, Kafka sources connected to topics with 24 partitions end up
running with parallelism 280 (the default), wasting resources with 256 idle
subtasks per source.
*Analysis*
I traced this to Dispatcher.java. In Flink 2.0, FLINK-36446 changed
internalSubmitJob() to accept ExecutionPlan instead of JobGraph:
private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan
executionPlan) {
...
if (executionPlan instanceof JobGraph) {
applyParallelismOverrides((JobGraph) executionPlan);
}
...
}
In Application Mode, Flink submits a StreamGraph (not JobGraph) to the
Dispatcher. Since executionPlan instanceof JobGraph is false, the
parallelism overrides are skipped entirely.
The StreamGraph is later converted to JobGraph in
DefaultSchedulerFactory.createScheduler():
} else if (executionPlan instanceof StreamGraph) {
jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader);
}
But by this point, the override logic has already been bypassed and there's
no second check.
*Evidence*
>From JobManager logs:
- Config IS loaded: Loading configuration property:
pipeline.jobvertex-parallelism-overrides, ...
- Log shows: Added StreamGraph(jobId: xxx) - confirming StreamGraph is
used
- The message "Changing job vertex {} parallelism from {} to {}" does
NOT appear
- All tasks run with uniform parallelism instead of configured overrides
*Versions Checked*
I checked all 2.x branches and the same code pattern exists in:
- release-2.0
- release-2.1
- release-2.2
- master
*Proposed Fix*
Apply parallelism overrides in DefaultSchedulerFactory right after the
StreamGraph → JobGraph conversion:
} else if (executionPlan instanceof StreamGraph) {
jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader);
// Apply parallelism overrides to the converted JobGraph
Map<String, String> overrides = new HashMap<>();
overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));
for (JobVertex vertex : jobGraph.getVertices()) {
String override = overrides.get(vertex.getID().toHexString());
if (override != null) {
vertex.setParallelism(Integer.parseInt(override));
}
}
}
This location works well because the JobGraph already has correct vertex
IDs and configuration is available.
*Questions*
1. Is this the expected behavior, or is this a bug/oversight from the
FLINK-36446 refactoring?
2. Was pipeline.jobvertex-parallelism-overrides ever intended to work
with StreamGraph submissions in Application Mode?
3. If this is indeed a bug, is the proposed fix location (
DefaultSchedulerFactory) appropriate, or would a different approach be
preferred?
4. Should I create a JIRA issue for this?
I'm happy to provide more details or contribute a fix if the community
confirms this is a bug.
Thanks.