Hi Flink developers,

I believe I've found a potential bug (or possibly missing feature) in Flink
2.x where pipeline.jobvertex-parallelism-overrides appears to be ignored
when running jobs in Application Mode. I wanted to discuss this with the
community before filing a JIRA.

This is affecting the Flink Kubernetes Operator autoscaler functionality in
our environment.

*The Problem*
When using the Flink Kubernetes Operator with autoscaling enabled, the
autoscaler correctly calculates per-vertex parallelism and writes it to
pipeline.jobvertex-parallelism-overrides. However, the job always runs with
uniform parallelism (parallelism.default) instead of the per-vertex
overrides.

For example, Kafka sources connected to topics with 24 partitions end up
running with parallelism 280 (the default), wasting resources with 256 idle
subtasks per source.

*Analysis*
I traced this to Dispatcher.java. In Flink 2.0, FLINK-36446 changed
internalSubmitJob() to accept ExecutionPlan instead of JobGraph:

private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan
executionPlan) {
    ...
    if (executionPlan instanceof JobGraph) {
        applyParallelismOverrides((JobGraph) executionPlan);
    }
    ...
}

In Application Mode, Flink submits a StreamGraph (not JobGraph) to the
Dispatcher. Since executionPlan instanceof JobGraph is false, the
parallelism overrides are skipped entirely.

The StreamGraph is later converted to JobGraph in
DefaultSchedulerFactory.createScheduler():

} else if (executionPlan instanceof StreamGraph) {
    jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader);
}

But by this point, the override logic has already been bypassed and there's
no second check.

*Evidence*
>From JobManager logs:

   - Config IS loaded: Loading configuration property:
   pipeline.jobvertex-parallelism-overrides, ...
   - Log shows: Added StreamGraph(jobId: xxx) - confirming StreamGraph is
   used
   - The message "Changing job vertex {} parallelism from {} to {}" does
   NOT appear
   - All tasks run with uniform parallelism instead of configured overrides

*Versions Checked*
I checked all 2.x branches and the same code pattern exists in:

   - release-2.0
   - release-2.1
   - release-2.2
   - master

*Proposed Fix*
Apply parallelism overrides in DefaultSchedulerFactory right after the
StreamGraph → JobGraph conversion:

} else if (executionPlan instanceof StreamGraph) {
    jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader);

    // Apply parallelism overrides to the converted JobGraph
    Map<String, String> overrides = new HashMap<>();
    overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
    
overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));

    for (JobVertex vertex : jobGraph.getVertices()) {
        String override = overrides.get(vertex.getID().toHexString());
        if (override != null) {
            vertex.setParallelism(Integer.parseInt(override));
        }
    }
}

This location works well because the JobGraph already has correct vertex
IDs and configuration is available.

*Questions*

   1. Is this the expected behavior, or is this a bug/oversight from the
   FLINK-36446 refactoring?
   2. Was pipeline.jobvertex-parallelism-overrides ever intended to work
   with StreamGraph submissions in Application Mode?
   3. If this is indeed a bug, is the proposed fix location (
   DefaultSchedulerFactory) appropriate, or would a different approach be
   preferred?
   4. Should I create a JIRA issue for this?

I'm happy to provide more details or contribute a fix if the community
confirms this is a bug.

Thanks.

Reply via email to