[
https://issues.apache.org/jira/browse/FLINK-39299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dennis-Mircea Ciupitu updated FLINK-39299:
------------------------------------------
Description:
h1. 1. Summary
The parallelism alignment logic in *JobVertexScaler#scale* (responsible for
adjusting computed parallelism to align with {*}numKeyGroupsOrPartitions{*})
contains several correctness issues that can cause scaling decisions to be
silently cancelled, inverted in direction, or excessively overshooting the
target. These problems are most impactful for Kafka source vertices with
partition counts that are prime or have few small divisors (e.g., 3, 5, 7, 11,
13), which are very common in production deployments.
Additionally, the existing two modes ({*}EVENLY_SPREAD{*} and
{*}MAXIMIZE_UTILISATION{*}) do not offer sufficient granularity.
{*}EVENLY_SPREAD{*}, despite its name, actually runs both an upward
outside-range search and a downward relaxed fallback (making it behave like a
combined mode), while *MAXIMIZE_UTILISATION* is purely upward. There is no way
to configure a truly strict within-range divisor-only mode, nor a mode that
simply accepts the computed target as-is when every subtask has work.
Furthermore, there is no mechanism to compose modes (e.g., try a strict
alignment first, then fall back to a more relaxed strategy).
h1. 2. Root Cause
The alignment logic consists of two loops:
1. *First loop (upward search,
[L554–L564|https://github.com/apache/flink-kubernetes-operator/blob/56af6bf3a94494923497e709faf29fb3749cd45f/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L554]):*
Starting from {*}newParallelism{*}, searches upward to
*upperBoundForAlignment* for a value *p* such that *N % p == 0* (or satisfies
the *MAXIMIZE_UTILISATION* condition).
2. *Fallback loop (downward search,
[L569–L577|https://github.com/apache/flink-kubernetes-operator/blob/56af6bf3a94494923497e709faf29fb3749cd45f/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L570]):*
If the first loop fails, searches downward from *newParallelism* toward 0 for
the next "partition boundary" where the integer division *N / p* increases.
Neither loop is aware of the *scaling direction* (scale-up vs. scale-down) or
the {*}current parallelism{*}. This causes the following systematic issues.
h1. 3. Notation
Throughout this report:
- *N* = *numKeyGroupsOrPartitions*
- *cur* = *currentParallelism*
- *new* = *newParallelism* (after capping by bounds at L536)
- *UB* = *upperBoundForAlignment* = *min(N, upperBound)*
- *q* = *⌊N / new⌋* (integer division)
- *divisors(N, [a, b])* = the set of divisors of N in the range [a, b]
When the first loop fails, the fallback loop result is computed as:
{code}
q = ⌊N / new⌋
p_raw = ⌊N / (q + 1)⌋ // largest p where ⌊N/p⌋ jumps above q
result = (N % p_raw ≠ 0) ? p_raw + 1 : p_raw
result = max(result, parallelismLowerLimit)
{code}
h1. 4. Problem Cases
h2. 4.1. Scale-Up (*cur < new*)
h3. 4.1.1. Case 1: Scale-up completely cancelled (returns *cur*)
*General formula:*
{code}
Preconditions:
1. cur < new // scale-up intent
2. divisors(N, [new, UB]) = ∅ // first loop fails
3. fallback_result == cur // fallback lands on current
Result: scale() returns cur → computeScaleTargetParallelism sees
new == cur → noChange(). Scale-up silently dropped.
{code}
This is *not* restricted to *cur < N/2*. It applies whenever the fallback
rounding lands exactly on *cur*, which depends on the divisor structure of N
relative to *new*.
*Concrete example:*
|| Parameter || Value ||
| currentParallelism | 1 |
| newParallelism (after capping) | 2 |
| numSourcePartitions (N) | 3 |
| bounds \[lower, upper\] | \[1, 2\] |
- *q = ⌊3/2⌋ = 1*, *p_raw = ⌊3/2⌋ = 1*, *3 % 1 = 0* → result = *1* = cur →
*noChange*.
- A Kafka source with 3 partitions at parallelism 2 is perfectly valid.
Another example (showing *cur > N/2* is also affected):
- cur=3, new=4, N=5, bounds \[1, 4\]
- *q = ⌊5/4⌋ = 1*, *p_raw = ⌊5/2⌋ = 2*, *5 % 2 ≠ 0* → result = *3* = cur →
*noChange*.
h3. 4.1.2. Case 2: Scale-up inverted into scale-down
*General formula:*
{code}
Preconditions:
1. cur < new // scale-up intent
2. divisors(N, [new, UB]) = ∅ // first loop fails
3. fallback_result < cur // fallback drops BELOW current
Result: scale() returns fallback_result < cur →
detectBlockScaling sees scaledUp = (cur < fallback_result) = false
→ enters the SCALE-DOWN path. Direction completely inverted.
{code}
This is particularly likely when *cur > ⌊N/2⌋*, because divisors of N tend to
cluster at the low end (*≤ N/2*) and at N itself. When UB prevents reaching N,
the fallback jumps to a divisor well below *cur*.
*Concrete example:*
|| Parameter || Value ||
| currentParallelism | 6 |
| newParallelism (after capping) | 8 |
| numSourcePartitions (N) | 10 |
| bounds \[lower, upper\] | \[1, 8\] |
- First loop: *p=8 → 10%8=2≠0* → no match.
- *q = ⌊10/8⌋ = 1*, *p_raw = ⌊10/2⌋ = 5*, *10 % 5 = 0* → result = *5*.
- *5 < cur(6)* → *detectBlockScaling*: *scaledUp = (6 < 5) = false* →
*scale-down path*.
h3. 4.1.3. Case 3: *MAXIMIZE_UTILISATION* mode does not resolve Cases 1 and 2
*General formula:*
{code}
The MAXIMIZE_UTILISATION condition in the first loop (L559-L561) is: [N / p] <
[N / new]
This has two structural flaws:
A) Self-comparison: At p = new (first iteration), this is x < x → always FALSE.
MAXIMIZE_UTILISATION can never accept newParallelism itself.
B) Integer division plateau: When new > ⌊N/2⌋, then ⌊N/p⌋ = 1 for ALL p
in [⌈N/2⌉, N-1]. The condition becomes 1 < 1 → FALSE across the entire
search range. MAXIMIZE_UTILISATION becomes equivalent to EVENLY_SPREAD.
Therefore: All of Cases 1-2 reproduce identically under MAXIMIZE_UTILISATION
whenever new > ⌊N/2⌋.
{code}
Concrete example (Case 1 under MAXIMIZE\_UTILISATION):
- cur=1, new=2, N=3, bounds \[1, 2\]:
- *p=2 → 3%2≠0*. MAXIMIZE\_UTILISATION: *⌊3/2⌋=1 < ⌊3/2⌋=1? NO*.
- Falls through to same fallback → returns *1* → *noChange*.
h3. 4.1.4. Case 4: Greedy overshoot
*General formula:*
{code}
Preconditions:
1. cur < new // scale-up intent
2. divisors(N, [new, UB]) ≠ ∅ // first loop succeeds
3. Let d = min(divisors(N, [new, UB])) // smallest divisor found
4. d >> new // significant overshoot
Under EVENLY_SPREAD: d is the smallest divisor of N ≥ new within UB.
Under MAXIMIZE_UTILISATION: d is the smallest p ≥ new where
N%p==0 OR ⌊N/p⌋ < ⌊N/new⌋. But due to the integer division plateau
(when new > ⌊N/2⌋, all quotients = 1), MAXIMIZE_UTILISATION finds
the same d as EVENLY_SPREAD — directly contradicting its purpose of
minimizing resource usage.
Result: scale() returns d > new, wasting (d - new) subtasks of resources.
{code}
*Concrete example:*
|| Parameter || Value ||
| currentParallelism | 5 |
| newParallelism (after capping) | 8 |
| numSourcePartitions (N) | 10 |
| bounds \[lower, upper\] | \[5, 10\] |
- First loop: *p=8 (10%8≠0), p=9 (10%9≠0), p=10 (10%10=0)* → result = *10* (25%
overshoot).
- With MAXIMIZE\_UTILISATION: *⌊10/8⌋=⌊10/9⌋=⌊10/10⌋=1*, plateau → same result
= *10*.
- Parallelism 8 for a 10-partition Kafka source is perfectly valid (8 subtasks,
2 get 2 partitions, 6 get 1).
h2. 4.2. Scale-Down (*cur > new*)
h3. Case 1: Scale-down completely cancelled (returns *cur*)
*General formula:*
{code}
Preconditions:
1. cur > new // scale-down intent
2. divisors(N, [new, cur-1]) = ∅ // no divisor between new and cur
3. N % cur == 0 // cur itself is a divisor
4. cur ≤ UB // first loop reaches cur
Result: First loop finds cur as the first divisor ≥ new.
scale() returns cur → computeScaleTargetParallelism sees
new == cur → noChange(). Scale-down silently dropped.
{code}
*Concrete example:*
|| Parameter || Value ||
| currentParallelism | 5 |
| newParallelism (after capping) | 4 |
| numSourcePartitions (N) | 10 |
| bounds \[lower, upper\] | \[1, 10\] |
- First loop: *p=4 → 10%4=2≠0*, *p=5 → 10%5=0* → result = *5* = cur →
*noChange*.
Another example (showing *new > N/2* is also affected):
- cur=9, new=7, N=9, bounds \[1, 9\]
- First loop: *p=7 (9%7≠0), p=8 (9%8≠0), p=9 (9%9=0)* → result = *9* = cur →
*noChange*.
h3. Case 2: Scale-down inverted into scale-up
*General formula:*
{code}
Preconditions:
1. cur > new // scale-down intent
2. divisors(N, [new, cur]) = ∅ // no divisor at or below cur
3. Let d = min(divisors(N, [new, UB])) // first divisor found
4. d > cur // overshoots past current
Result: scale() returns d > cur →
detectBlockScaling sees scaledUp = (cur < d) = true
→ enters the SCALE-UP path. Direction completely inverted.
This is particularly likely when new > ⌊N/2⌋, because the only divisor
of N in [⌈N/2⌉, N] is typically N itself (unless N is even, in which
case N/2 is also a divisor). So the first loop jumps all the way to N.
{code}
*Concrete example:*
|| Parameter || Value ||
| currentParallelism | 8 |
| newParallelism (after capping) | 6 |
| numSourcePartitions (N) | 10 |
| bounds \[lower, upper\] | \[6, 10\] |
- First loop: *p=6 (10%6≠0), p=7 (10%7≠0), p=8 (10%8≠0), p=9 (10%9≠0), p=10
(10%10=0)* → result = *10*.
- *10 > cur(8)* → *scaledUp = true* → *scale-up path*.
h3. Case 3: *MAXIMIZE_UTILISATION* does not resolve Cases 5 and 6
*General formula:*
{code}
Same structural issues as Case 3:
For Case 5: The first loop finds cur because N%cur==0, which triggers
the modulo check before MAXIMIZE_UTILISATION is even evaluated.
The mode is irrelevant.
For Case 6: When new > ⌊N/2⌋, the integer division plateau means
⌊N/p⌋ = ⌊N/new⌋ = 1 for all p in [new, N-1]. MAXIMIZE_UTILISATION
cannot differentiate any candidate and falls through to the same
divisor (N) as EVENLY_SPREAD.
{code}
h2. 4.3. Cross-Cutting: Spurious *ScalingLimited* Warning Events
*General formula:*
{code}
Preconditions:
1. divisors(N, [new, UB]) = ∅ // first loop fails
2. fallback_result == new // rounding brings result back to new
This happens when:
p_raw = ⌊N / (q + 1)⌋
N % p_raw ≠ 0 → result = p_raw + 1 == new
Result: ScalingLimited event is emitted with "expected: X, actual: X"
→ a completely misleading warning that pollutes the event log, even
though the scaling proceeds correctly.
{code}
*Concrete example:*
- cur=2, new=3, N=7, bounds \[1, 5\]
- First loop: *p=3 (7%3≠0), p=4 (7%4≠0), p=5 (7%5≠0)* → fails.
- *q=⌊7/3⌋=2*, *p_raw=⌊7/3⌋=2*, *7%2≠0* → result = *3* = new.
- ScalingLimited warning: "expected: 3, actual: 3" ← false alarm.
h1. 5. Impact
- *Systematic, not isolated:* The formulas above show these issues are
determined by the divisor structure of N relative to *new*, *cur*, and *UB*.
Any partition count with sparse divisors triggers them which includes all
primes and most semi-primes, which are extremely common in production Kafka
deployments.
- Combined with tight *VERTEX_MAX_PARALLELISM* settings, the alignment logic
can render the autoscaler completely unable to scale a vertex.
- Direction inversion (Cases 2 and 6) can cause resource oscillation and
instability.
- *MAXIMIZE_UTILISATION* mode fails to provide any benefit in the most
problematic scenarios (when *new > ⌊N/2⌋*), despite being specifically designed
for these use cases.
- Spurious *ScalingLimited* events reduce signal-to-noise ratio in monitoring.
h1. 6. Proposed Solution
The new proposed solution covers:
- *Direction safety by construction* - separate scale-up/scale-down paths with
structural guarantees.
- *Single-responsibility modes* - the old combined EVENLY_SPREAD is decomposed
into distinct building blocks.
- *Composable mode + fallback* - any mode paired with any fallback, no
combinatorial explosion.
- *Open for extension* - new modes (e.g., *ADAPTIVE_DOWNWARD_SPREAD*,
NEAREST_SPREAD*) only need to implement the
*searchesWithinRange*/*allowsOutsideRange* predicates and an *isAligned*
branch; new fallbacks only need a *toMode()* mapping. The orchestration logic
in *scaleUp()*/*scaleDown()* and the fallback framework remain untouched.
h2. 6.1. Direction-aware *scaleUp()* / *scaleDown()* entry points
Split the single alignment code path into two direction-aware methods:
- *scaleUp()* → guarantees the result is strictly *above* *currentParallelism*,
or returns *currentParallelism* (sentinel = "no aligned value found").
- *scaleDown()* → guarantees the result is strictly *below*
*currentParallelism*, or returns *currentParallelism*.
The *scale()* method dispatches to the appropriate one based on *newParallelism
> currentParallelism* vs *newParallelism < currentParallelism*.
h2. 6.2. Three-phase alignment algorithm per mode
Each mode's algorithm is structured into three phases (first match wins):
| Phase | Description | Active for |
| *Phase 1* (within range) | Divisor search between *currentParallelism* and
*newParallelism*. Scale-up searches downward from target; scale-down searches
upward. | *EVENLY_SPREAD*, *OPTIMIZE_RESOURCE_UTILIZATION* |
| *Phase 2* (upward outside range) | Searches upward from *newParallelism* to
*upper align limit*. Accepts exact divisors; *MAXIMIZE_UTILISATION*
additionally accepts values where per-subtask load decreases. In scale-down,
capped at *min(currentParallelism, upper align limit)* to prevent crossing. |
*MAXIMIZE_UTILISATION*, *ADAPTIVE_UPWARD_SPREAD* |
| *Phase 3* (relaxed downward fallback) | Searches downward from
*newParallelism* for the boundary where per-subtask load increases, then snaps
up to the nearest divisor-aligned value. In scale-up, a *direction guard*
rejects results ≤ *currentParallelism*. | *MAXIMIZE_UTILISATION*,
*ADAPTIVE_UPWARD_SPREAD* |
Phase 2 and Phase 3 share the same `allowsOutsideRange(mode)` guard and
identical logic between scale-up and scale-down. They are extracted into a
single shared method *applyOutsideRangeSearch()*, parameterized by:
- `phase2UpperBound` → `upper align limit` for scale-up;
`min(currentParallelism, upper align limit)` for scale-down.
- `isScaleUp` → enables the direction guard in Phase 3.
h2. 6.3. Search ranges diagram
{code:none}
PARALLELISM ALIGNMENT — SEARCH RANGES PER MODE
══════════════════════════════════════════════════════════════════════════════════════
Complete number line with all key barriers:
lower align new ↓ current new ↑ upper align
limit¹ target² parallelism target³ limit⁴
│ │ │ │ │
────┴───────────────┴───────────────┴───────────────┴───────────────┴──►
│ │ │ │ │
│ ◄──── scale-down zone ─────► │ ◄────── scale-up zone ─────► │
¹ parallelismLowerLimit — floor for alignment results
² newParallelism when scaling down
³ newParallelism when scaling up
⁴ upperBoundForAlignment = min(N, min(maxParallelism, parallelismUpperLimit))
N = numKeyGroupsOrPartitions (key groups or source partitions)
Legend: ◄ = downward search direction ► = upward search direction
├──────┤ = search range P1/P2/P3 = phase (first match wins)
═══════════════════════════════════════════════════════════════════════════════════════════
SCALE-UP (newParallelism > currentParallelism)
═══════════════════════════════════════════════════════════════════════════════════════════
currentP new(↑) upper align
limit
│ │ │
─────┼─────────────────┼──────────────┼──►
EVENLY_SPREAD P1: ├────────◄────────┤
N % p == 0
OPTIMIZE_RESOURCE_ P1: ├────────◄────────┤
p ≤ N
UTILIZATION
· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
· · · ·
MAXIMIZE_UTILISATION P2: ├────────►─────┤
N%p==0 ∨ N/p<N/new
P3: ├────────◄────────┤
snap to load boundary
· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
· · · ·
ADAPTIVE_UPWARD_ P2: ├────────►─────┤
N % p == 0
SPREAD P3: ├────────◄────────┤
snap to load boundary
P3 direction guard: result must be > currentP,
otherwise discarded
═══════════════════════════════════════════════════════════════════════════════════════════
SCALE-DOWN (newParallelism < currentParallelism)
═══════════════════════════════════════════════════════════════════════════════════════════
lower align new(↓)
currentP
limit
│ │ │
─────┼────────────────┼────────────────────┼──►
EVENLY_SPREAD P1: ├────────►───────────┤
N % p == 0
OPTIMIZE_RESOURCE_ P1: ├────────►───────────┤
p ≤ N
UTILIZATION
· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
· · · ·
MAXIMIZE_UTILISATION P2: ├────────►───────────┤⁵
N%p==0 ∨ N/p<N/new
P3: ├───────◄────────┤
snap to load boundary
· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
· · · ·
ADAPTIVE_UPWARD_ P2: ├────────►───────────┤⁵
N % p == 0
SPREAD P3: ├───────◄────────┤
snap to load boundary
⁵ Phase 2 upper bound capped at min(currentP, upper align limit) to prevent
direction inversion
Phase 3 results are inherently ≤ new(↓) < currentP → no direction guard
needed
═══════════════════════════════════════════════════════════════════════════════════════════
MODE-THEN-FALLBACK FLOW (applies to both scale-up and scale-down)
═══════════════════════════════════════════════════════════════════════════════════════════
┌─────────────────────┐ found? ┌──────────┐
│ Run primary mode's ├──── YES ──────►│ Return │
│ full P1 → P2 → P3 │ │ result │
└─────────┬───────────┘ └──────────┘
│ NO (sentinel)
▼
┌─────────────────────┐ found? ┌──────────┐
│ Run fallback mode's ├──── YES ──────►│ Return │
│ full P1 → P2 → P3 │ │ result │
└─────────┬───────────┘ └──────────┘
│ NO (sentinel)
▼
┌─────────────────────────────────────────────────┐
│ WARN log (compliant vs last-resort check) │
│ Emit SCALING_LIMITED event │
│ Return currentParallelism (scaling blocked) │
└─────────────────────────────────────────────────┘
{code}
h2. 6.4. New modes: *EVENLY_SPREAD* (redefined) and
*OPTIMIZE_RESOURCE_UTILIZATION*
The original implementation offered only two modes (*EVENLY_SPREAD* and
*MAXIMIZE_UTILISATION*), but *EVENLY_SPREAD* was misleadingly named, it
actually ran both an upward outside-range search *and* a downward relaxed
fallback (the same three-phase algorithm now explicitly named Phase 1+2+3),
making it behave like a combined strategy rather than a strict divisor-only
mode. This made it impossible for users to select a truly conservative
alignment that blocks scaling when no divisor exists in the safe range.
The new design introduces two additional modes to cover the full spectrum of
alignment strictness:
| Mode | Strictness | Search behavior | When to use |
| *EVENLY_SPREAD* (redefined) | Strict | Within-range only (Phase 1). Accepts
only exact divisors of N. Blocks scaling if none found. | Data-sensitive
workloads where uneven distribution is unacceptable (e.g., windowed
aggregations, partitioned state). |
| *OPTIMIZE_RESOURCE_UTILIZATION* (new) | Minimal | Within-range only (Phase
1). Accepts any *p ≤ N* (every subtask has at least one item). | Cost-sensitive
workloads where resource efficiency matters more than even distribution. The
autoscaler's computed target is used almost as-is. |
Together with the existing modes, the full design space is:
| Mode | Alignment strictness | Phases used | Search direction |
| *EVENLY_SPREAD* | Strict (divisor only) | P1 | Within range only |
| *OPTIMIZE_RESOURCE_UTILIZATION* | Minimal (*p ≤ N*) | P1 | Within range only |
| *MAXIMIZE_UTILISATION* | Relaxed (*N/p < N/new*) | P2 + P3 | Upward outside
range + downward fallback |
| *ADAPTIVE_UPWARD_SPREAD* | Combined (divisor → relaxed fallback) | P2 + P3 |
Upward outside range + downward fallback |
h2. 6.5. *ADAPTIVE_UPWARD_SPREAD* mode (rename + new default)
The old *EVENLY_SPREAD* behavior, which combined a divisor search with an
implicit *MAXIMIZE_UTILISATION*-style fallback and a downward relaxed search,
is captured by the new *ADAPTIVE_UPWARD_SPREAD* mode. This name explicitly
conveys that it searches *above* the target and adapts between strict and
relaxed alignment. It is now the *default mode*, preserving backward
compatibility with the old *EVENLY_SPREAD* behavior while giving it a more
accurate name.
h2. 6.6. Composable fallback configuration
With properly separated modes, strict modes like *EVENLY_SPREAD* will block
scaling when no divisor exists in the safe range. Previously, the relaxed
fallback was implicitly baked into *EVENLY_SPREAD* with no way to control it.
The new design introduces a separate fallback configuration:
{code:java}
job.autoscaler.scaling.key-group.partitions.adjust.mode.fallback
{code}
*Why this is needed:*
- *Separation of concerns:* The primary mode defines the *preferred* alignment
strategy; the fallback defines what to do when it fails. Mixing both into a
single enum creates combinatorial explosion and unclear semantics.
- *User control:* Different workloads need different trade-offs. A user might
want strict *EVENLY_SPREAD* alignment as the primary strategy but accept
*OPTIMIZE_RESOURCE_UTILIZATION* as a fallback to avoid blocking scaling
entirely. Without a fallback config, the only options are "strict and block" or
"relaxed from the start."
- *Extensibility:* New modes can be added without creating N × M combined enum
values. Any mode can be paired with any fallback.
*Enum:* *KeyGroupOrPartitionsAdjustFallback* with values:
| Value | Behavior |
| *DEFAULT* | Uses the mode's built-in fallback. Only *ADAPTIVE_UPWARD_SPREAD*
has a built-in fallback (*MAXIMIZE_UTILISATION*). All others resolve to *NONE*.
|
| *NONE* | Blocks scaling when no aligned value is found. |
| *ADAPTIVE_UPWARD_SPREAD* | Delegates to the full *ADAPTIVE_UPWARD_SPREAD*
algorithm as a second pass. |
| *MAXIMIZE_UTILISATION* | Falls back to the relaxed *MAXIMIZE_UTILISATION*
search. |
| *OPTIMIZE_RESOURCE_UTILIZATION* | Falls back to accepting any value where *p
≤ N*. |
The *toMode()* method on the fallback enum maps each value to the corresponding
*KeyGroupOrPartitionsAdjustMode*, enabling the same *applyScaleUpMode* /
*applyScaleDownMode* methods to be reused for both primary and fallback passes
→ no code duplication.
*Example configurations:*
| Use case | Mode | Fallback | Behavior |
| Backward-compatible default | *ADAPTIVE_UPWARD_SPREAD* | *DEFAULT* →
*MAXIMIZE_UTILISATION* | Tries divisor above target, falls back to relaxed
load-decrease search |
| Strict with graceful degradation | *EVENLY_SPREAD* |
*OPTIMIZE_RESOURCE_UTILIZATION* | Tries exact divisor in range; if none,
accepts any *p ≤ N* |
| Strict, block on failure | *EVENLY_SPREAD* | *NONE* | Exact divisor or
nothing — scaling blocked if no divisor in range |
| Maximum resource efficiency | *OPTIMIZE_RESOURCE_UTILIZATION* | *NONE* |
Always accepts the autoscaler's target (as long as *p ≤ N*) |
| Strict with upward fallback | *EVENLY_SPREAD* | *ADAPTIVE_UPWARD_SPREAD* |
Tries divisor in range; if none, searches above target with full adaptive
algorithm |
h2. 6.7. Proposed Scaling Flow
{code:none}
┌──────────────────────────────────┐
│ applyScaleUpMode(primaryMode) │ ← self-contained: Phase 1 + 2 + 3
│ returns result or curP │
└──────────┬───────────────────────┘
│ result == curP?
▼ yes
┌──────────────────────────────────┐
│ applyScaleUpMode(fallbackMode) │ ← same method, different mode
│ returns result or curP │
└──────────┬───────────────────────┘
│ result == curP?
▼ yes
emit SCALING_LIMITED event
return curP
{code}
h2. 6.8. Compliance-aware warning logging
When both the primary mode and fallback fail to find an aligned value, a *WARN*
log distinguishes between two cases:
- *Compliant:* *currentParallelism* is itself aligned with the mode/fallback -
there's simply no *better* aligned value in the scaling direction.
- *Last resort:* *currentParallelism* does *not* comply with the mode or
fallback - it is returned purely to preserve the scaling direction.
This helps operators diagnose whether the autoscaler is stuck at a good value
(just no room to scale) vs stuck at a bad value (alignment constraint is too
restrictive for the current state).
was:
h1. 1. Summary
The parallelism alignment logic in *JobVertexScaler#scale* (responsible for
adjusting computed parallelism to align with {*}numKeyGroupsOrPartitions{*})
contains several correctness issues that can cause scaling decisions to be
silently cancelled, inverted in direction, or excessively overshooting the
target. These problems are most impactful for Kafka source vertices with
partition counts that are prime or have few small divisors (e.g., 3, 5, 7, 11,
13), which are very common in production deployments.
Additionally, the existing two modes ({*}EVENLY_SPREAD{*} and
{*}MAXIMIZE_UTILISATION{*}) do not offer sufficient granularity.
{*}EVENLY_SPREAD{*}, despite its name, actually runs both an upward
outside-range search and a downward relaxed fallback (making it behave like a
combined mode), while *MAXIMIZE_UTILISATION* is purely upward. There is no way
to configure a truly strict within-range divisor-only mode, nor a mode that
simply accepts the computed target as-is when every subtask has work.
Furthermore, there is no mechanism to compose modes (e.g., try a strict
alignment first, then fall back to a more relaxed strategy).
h1. 2. Root Cause
The alignment logic consists of two loops:
1. *First loop (upward search,
[L554–L564|https://github.com/apache/flink-kubernetes-operator/blob/56af6bf3a94494923497e709faf29fb3749cd45f/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L554]):*
Starting from {*}newParallelism{*}, searches upward to
*upperBoundForAlignment* for a value *p* such that *N % p == 0* (or satisfies
the *MAXIMIZE_UTILISATION* condition).
2. *Fallback loop (downward search,
[L569–L577|https://github.com/apache/flink-kubernetes-operator/blob/56af6bf3a94494923497e709faf29fb3749cd45f/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L570]):*
If the first loop fails, searches downward from *newParallelism* toward 0 for
the next "partition boundary" where the integer division *N / p* increases.
Neither loop is aware of the *scaling direction* (scale-up vs. scale-down) or
the {*}current parallelism{*}. This causes the following systematic issues.
h1. 3. Notation
Throughout this report:
- *N* = *numKeyGroupsOrPartitions*
- *cur* = *currentParallelism*
- *new* = *newParallelism* (after capping by bounds at L536)
- *UB* = *upperBoundForAlignment* = *min(N, upperBound)*
- *q* = *⌊N / new⌋* (integer division)
- *divisors(N, [a, b])* = the set of divisors of N in the range [a, b]
When the first loop fails, the fallback loop result is computed as:
{code}
q = ⌊N / new⌋
p_raw = ⌊N / (q + 1)⌋ // largest p where ⌊N/p⌋ jumps above q
result = (N % p_raw ≠ 0) ? p_raw + 1 : p_raw
result = max(result, parallelismLowerLimit)
{code}
h1. 4. Problem Cases
h2. 4.1. Scale-Up (*cur < new*)
h3. 4.1.1. Case 1: Scale-up completely cancelled (returns *cur*)
*General formula:*
{code}
Preconditions:
1. cur < new // scale-up intent
2. divisors(N, [new, UB]) = ∅ // first loop fails
3. fallback_result == cur // fallback lands on current
Result: scale() returns cur → computeScaleTargetParallelism sees
new == cur → noChange(). Scale-up silently dropped.
{code}
This is *not* restricted to *cur < N/2*. It applies whenever the fallback
rounding lands exactly on *cur*, which depends on the divisor structure of N
relative to *new*.
*Concrete example:*
|| Parameter || Value ||
| currentParallelism | 1 |
| newParallelism (after capping) | 2 |
| numSourcePartitions (N) | 3 |
| bounds \[lower, upper\] | \[1, 2\] |
- *q = ⌊3/2⌋ = 1*, *p_raw = ⌊3/2⌋ = 1*, *3 % 1 = 0* → result = *1* = cur →
*noChange*.
- A Kafka source with 3 partitions at parallelism 2 is perfectly valid.
Another example (showing *cur > N/2* is also affected):
- cur=3, new=4, N=5, bounds \[1, 4\]
- *q = ⌊5/4⌋ = 1*, *p_raw = ⌊5/2⌋ = 2*, *5 % 2 ≠ 0* → result = *3* = cur →
*noChange*.
h3. 4.1.2. Case 2: Scale-up inverted into scale-down
*General formula:*
{code}
Preconditions:
1. cur < new // scale-up intent
2. divisors(N, [new, UB]) = ∅ // first loop fails
3. fallback_result < cur // fallback drops BELOW current
Result: scale() returns fallback_result < cur →
detectBlockScaling sees scaledUp = (cur < fallback_result) = false
→ enters the SCALE-DOWN path. Direction completely inverted.
{code}
This is particularly likely when *cur > ⌊N/2⌋*, because divisors of N tend to
cluster at the low end (*≤ N/2*) and at N itself. When UB prevents reaching N,
the fallback jumps to a divisor well below *cur*.
*Concrete example:*
|| Parameter || Value ||
| currentParallelism | 6 |
| newParallelism (after capping) | 8 |
| numSourcePartitions (N) | 10 |
| bounds \[lower, upper\] | \[1, 8\] |
- First loop: *p=8 → 10%8=2≠0* → no match.
- *q = ⌊10/8⌋ = 1*, *p_raw = ⌊10/2⌋ = 5*, *10 % 5 = 0* → result = *5*.
- *5 < cur(6)* → *detectBlockScaling*: *scaledUp = (6 < 5) = false* →
*scale-down path*.
h3. 4.1.3. Case 3: *MAXIMIZE_UTILISATION* mode does not resolve Cases 1 and 2
*General formula:*
{code}
The MAXIMIZE_UTILISATION condition in the first loop (L559-L561) is: [N / p] <
[N / new]
This has two structural flaws:
A) Self-comparison: At p = new (first iteration), this is x < x → always FALSE.
MAXIMIZE_UTILISATION can never accept newParallelism itself.
B) Integer division plateau: When new > ⌊N/2⌋, then ⌊N/p⌋ = 1 for ALL p
in [⌈N/2⌉, N-1]. The condition becomes 1 < 1 → FALSE across the entire
search range. MAXIMIZE_UTILISATION becomes equivalent to EVENLY_SPREAD.
Therefore: All of Cases 1-2 reproduce identically under MAXIMIZE_UTILISATION
whenever new > ⌊N/2⌋.
{code}
Concrete example (Case 1 under MAXIMIZE\_UTILISATION):
- cur=1, new=2, N=3, bounds \[1, 2\]:
- *p=2 → 3%2≠0*. MAXIMIZE\_UTILISATION: *⌊3/2⌋=1 < ⌊3/2⌋=1? NO*.
- Falls through to same fallback → returns *1* → *noChange*.
h3. 4.1.4. Case 4: Greedy overshoot
*General formula:*
{code}
Preconditions:
1. cur < new // scale-up intent
2. divisors(N, [new, UB]) ≠ ∅ // first loop succeeds
3. Let d = min(divisors(N, [new, UB])) // smallest divisor found
4. d >> new // significant overshoot
Under EVENLY_SPREAD: d is the smallest divisor of N ≥ new within UB.
Under MAXIMIZE_UTILISATION: d is the smallest p ≥ new where
N%p==0 OR ⌊N/p⌋ < ⌊N/new⌋. But due to the integer division plateau
(when new > ⌊N/2⌋, all quotients = 1), MAXIMIZE_UTILISATION finds
the same d as EVENLY_SPREAD — directly contradicting its purpose of
minimizing resource usage.
Result: scale() returns d > new, wasting (d - new) subtasks of resources.
{code}
*Concrete example:*
|| Parameter || Value ||
| currentParallelism | 5 |
| newParallelism (after capping) | 8 |
| numSourcePartitions (N) | 10 |
| bounds \[lower, upper\] | \[5, 10\] |
- First loop: *p=8 (10%8≠0), p=9 (10%9≠0), p=10 (10%10=0)* → result = *10* (25%
overshoot).
- With MAXIMIZE\_UTILISATION: *⌊10/8⌋=⌊10/9⌋=⌊10/10⌋=1*, plateau → same result
= *10*.
- Parallelism 8 for a 10-partition Kafka source is perfectly valid (8 subtasks,
2 get 2 partitions, 6 get 1).
h2. 4.2. Scale-Down (*cur > new*)
h3. Case 1: Scale-down completely cancelled (returns *cur*)
*General formula:*
{code}
Preconditions:
1. cur > new // scale-down intent
2. divisors(N, [new, cur-1]) = ∅ // no divisor between new and cur
3. N % cur == 0 // cur itself is a divisor
4. cur ≤ UB // first loop reaches cur
Result: First loop finds cur as the first divisor ≥ new.
scale() returns cur → computeScaleTargetParallelism sees
new == cur → noChange(). Scale-down silently dropped.
{code}
*Concrete example:*
|| Parameter || Value ||
| currentParallelism | 5 |
| newParallelism (after capping) | 4 |
| numSourcePartitions (N) | 10 |
| bounds \[lower, upper\] | \[1, 10\] |
- First loop: *p=4 → 10%4=2≠0*, *p=5 → 10%5=0* → result = *5* = cur →
*noChange*.
Another example (showing *new > N/2* is also affected):
- cur=9, new=7, N=9, bounds \[1, 9\]
- First loop: *p=7 (9%7≠0), p=8 (9%8≠0), p=9 (9%9=0)* → result = *9* = cur →
*noChange*.
h3. Case 2: Scale-down inverted into scale-up
*General formula:*
{code}
Preconditions:
1. cur > new // scale-down intent
2. divisors(N, [new, cur]) = ∅ // no divisor at or below cur
3. Let d = min(divisors(N, [new, UB])) // first divisor found
4. d > cur // overshoots past current
Result: scale() returns d > cur →
detectBlockScaling sees scaledUp = (cur < d) = true
→ enters the SCALE-UP path. Direction completely inverted.
This is particularly likely when new > ⌊N/2⌋, because the only divisor
of N in [⌈N/2⌉, N] is typically N itself (unless N is even, in which
case N/2 is also a divisor). So the first loop jumps all the way to N.
{code}
*Concrete example:*
|| Parameter || Value ||
| currentParallelism | 8 |
| newParallelism (after capping) | 6 |
| numSourcePartitions (N) | 10 |
| bounds \[lower, upper\] | \[6, 10\] |
- First loop: *p=6 (10%6≠0), p=7 (10%7≠0), p=8 (10%8≠0), p=9 (10%9≠0), p=10
(10%10=0)* → result = *10*.
- *10 > cur(8)* → *scaledUp = true* → *scale-up path*.
h3. Case 3: *MAXIMIZE_UTILISATION* does not resolve Cases 5 and 6
*General formula:*
{code}
Same structural issues as Case 3:
For Case 5: The first loop finds cur because N%cur==0, which triggers
the modulo check before MAXIMIZE_UTILISATION is even evaluated.
The mode is irrelevant.
For Case 6: When new > ⌊N/2⌋, the integer division plateau means
⌊N/p⌋ = ⌊N/new⌋ = 1 for all p in [new, N-1]. MAXIMIZE_UTILISATION
cannot differentiate any candidate and falls through to the same
divisor (N) as EVENLY_SPREAD.
{code}
h2. 4.3. Cross-Cutting: Spurious *ScalingLimited* Warning Events
*General formula:*
{code}
Preconditions:
1. divisors(N, [new, UB]) = ∅ // first loop fails
2. fallback_result == new // rounding brings result back to new
This happens when:
p_raw = ⌊N / (q + 1)⌋
N % p_raw ≠ 0 → result = p_raw + 1 == new
Result: ScalingLimited event is emitted with "expected: X, actual: X"
→ a completely misleading warning that pollutes the event log, even
though the scaling proceeds correctly.
{code}
*Concrete example:*
- cur=2, new=3, N=7, bounds \[1, 5\]
- First loop: *p=3 (7%3≠0), p=4 (7%4≠0), p=5 (7%5≠0)* → fails.
- *q=⌊7/3⌋=2*, *p_raw=⌊7/3⌋=2*, *7%2≠0* → result = *3* = new.
- ScalingLimited warning: "expected: 3, actual: 3" ← false alarm.
h1. 5. Impact
- *Systematic, not isolated:* The formulas above show these issues are
determined by the divisor structure of N relative to *new*, *cur*, and *UB*.
Any partition count with sparse divisors triggers them — this includes all
primes and most semi-primes, which are extremely common in production Kafka
deployments.
- Combined with tight *VERTEX_MAX_PARALLELISM* settings, the alignment logic
can render the autoscaler completely unable to scale a vertex.
- Direction inversion (Cases 2 and 6) can cause resource oscillation and
instability.
- *MAXIMIZE_UTILISATION* mode fails to provide any benefit in the most
problematic scenarios (when *new > ⌊N/2⌋*), despite being specifically designed
for these use cases.
- Spurious *ScalingLimited* events reduce signal-to-noise ratio in monitoring.
h1. 6. Proposed Solution
The new proposed solution covers:
- *Direction safety by construction* - separate scale-up/scale-down paths with
structural guarantees.
- *Single-responsibility modes* - the old combined EVENLY_SPREAD is decomposed
into distinct building blocks.
- *Composable mode + fallback* - any mode paired with any fallback, no
combinatorial explosion.
- *Open for extension* - new modes (e.g., *ADAPTIVE_DOWNWARD_SPREAD*,
NEAREST_SPREAD*) only need to implement the
*searchesWithinRange*/*allowsOutsideRange* predicates and an *isAligned*
branch; new fallbacks only need a *toMode()* mapping. The orchestration logic
in *scaleUp()*/*scaleDown()* and the fallback framework remain untouched.
h2. 6.1. Direction-aware *scaleUp()* / *scaleDown()* entry points
Split the single alignment code path into two direction-aware methods:
- *scaleUp()* → guarantees the result is strictly *above* *currentParallelism*,
or returns *currentParallelism* (sentinel = "no aligned value found").
- *scaleDown()* → guarantees the result is strictly *below*
*currentParallelism*, or returns *currentParallelism*.
The *scale()* method dispatches to the appropriate one based on *newParallelism
> currentParallelism* vs *newParallelism < currentParallelism*.
h2. 6.2. Three-phase alignment algorithm per mode
Each mode's algorithm is structured into three phases (first match wins):
| Phase | Description | Active for |
| *Phase 1* (within range) | Divisor search between *currentParallelism* and
*newParallelism*. Scale-up searches downward from target; scale-down searches
upward. | *EVENLY_SPREAD*, *OPTIMIZE_RESOURCE_UTILIZATION* |
| *Phase 2* (upward outside range) | Searches upward from *newParallelism* to
*upper align limit*. Accepts exact divisors; *MAXIMIZE_UTILISATION*
additionally accepts values where per-subtask load decreases. In scale-down,
capped at *min(currentParallelism, upper align limit)* to prevent crossing. |
*MAXIMIZE_UTILISATION*, *ADAPTIVE_UPWARD_SPREAD* |
| *Phase 3* (relaxed downward fallback) | Searches downward from
*newParallelism* for the boundary where per-subtask load increases, then snaps
up to the nearest divisor-aligned value. In scale-up, a *direction guard*
rejects results ≤ *currentParallelism*. | *MAXIMIZE_UTILISATION*,
*ADAPTIVE_UPWARD_SPREAD* |
Phase 2 and Phase 3 share the same `allowsOutsideRange(mode)` guard and
identical logic between scale-up and scale-down. They are extracted into a
single shared method *applyOutsideRangeSearch()*, parameterized by:
- `phase2UpperBound` → `upper align limit` for scale-up;
`min(currentParallelism, upper align limit)` for scale-down.
- `isScaleUp` → enables the direction guard in Phase 3.
h2. 6.3. Search ranges diagram
{code:none}
PARALLELISM ALIGNMENT — SEARCH RANGES PER MODE
══════════════════════════════════════════════════════════════════════════════════════
Complete number line with all key barriers:
lower align new ↓ current new ↑ upper align
limit¹ target² parallelism target³ limit⁴
│ │ │ │ │
────┴───────────────┴───────────────┴───────────────┴───────────────┴──►
│ │ │ │ │
│ ◄──── scale-down zone ─────► │ ◄────── scale-up zone ─────► │
¹ parallelismLowerLimit — floor for alignment results
² newParallelism when scaling down
³ newParallelism when scaling up
⁴ upperBoundForAlignment = min(N, min(maxParallelism, parallelismUpperLimit))
N = numKeyGroupsOrPartitions (key groups or source partitions)
Legend: ◄ = downward search direction ► = upward search direction
├──────┤ = search range P1/P2/P3 = phase (first match wins)
═══════════════════════════════════════════════════════════════════════════════════════════
SCALE-UP (newParallelism > currentParallelism)
═══════════════════════════════════════════════════════════════════════════════════════════
currentP new(↑) upper align
limit
│ │ │
─────┼─────────────────┼──────────────┼──►
EVENLY_SPREAD P1: ├────────◄────────┤
N % p == 0
OPTIMIZE_RESOURCE_ P1: ├────────◄────────┤
p ≤ N
UTILIZATION
· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
· · · ·
MAXIMIZE_UTILISATION P2: ├────────►─────┤
N%p==0 ∨ N/p<N/new
P3: ├────────◄────────┤
snap to load boundary
· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
· · · ·
ADAPTIVE_UPWARD_ P2: ├────────►─────┤
N % p == 0
SPREAD P3: ├────────◄────────┤
snap to load boundary
P3 direction guard: result must be > currentP,
otherwise discarded
═══════════════════════════════════════════════════════════════════════════════════════════
SCALE-DOWN (newParallelism < currentParallelism)
═══════════════════════════════════════════════════════════════════════════════════════════
lower align new(↓)
currentP
limit
│ │ │
─────┼────────────────┼────────────────────┼──►
EVENLY_SPREAD P1: ├────────►───────────┤
N % p == 0
OPTIMIZE_RESOURCE_ P1: ├────────►───────────┤
p ≤ N
UTILIZATION
· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
· · · ·
MAXIMIZE_UTILISATION P2: ├────────►───────────┤⁵
N%p==0 ∨ N/p<N/new
P3: ├───────◄────────┤
snap to load boundary
· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
· · · ·
ADAPTIVE_UPWARD_ P2: ├────────►───────────┤⁵
N % p == 0
SPREAD P3: ├───────◄────────┤
snap to load boundary
⁵ Phase 2 upper bound capped at min(currentP, upper align limit) to prevent
direction inversion
Phase 3 results are inherently ≤ new(↓) < currentP → no direction guard
needed
═══════════════════════════════════════════════════════════════════════════════════════════
MODE-THEN-FALLBACK FLOW (applies to both scale-up and scale-down)
═══════════════════════════════════════════════════════════════════════════════════════════
┌─────────────────────┐ found? ┌──────────┐
│ Run primary mode's ├──── YES ──────►│ Return │
│ full P1 → P2 → P3 │ │ result │
└─────────┬───────────┘ └──────────┘
│ NO (sentinel)
▼
┌─────────────────────┐ found? ┌──────────┐
│ Run fallback mode's ├──── YES ──────►│ Return │
│ full P1 → P2 → P3 │ │ result │
└─────────┬───────────┘ └──────────┘
│ NO (sentinel)
▼
┌─────────────────────────────────────────────────┐
│ WARN log (compliant vs last-resort check) │
│ Emit SCALING_LIMITED event │
│ Return currentParallelism (scaling blocked) │
└─────────────────────────────────────────────────┘
{code}
h2. 6.4. New modes: *EVENLY_SPREAD* (redefined) and
*OPTIMIZE_RESOURCE_UTILIZATION*
The original implementation offered only two modes (*EVENLY_SPREAD* and
*MAXIMIZE_UTILISATION*), but *EVENLY_SPREAD* was misleadingly named, it
actually ran both an upward outside-range search *and* a downward relaxed
fallback (the same three-phase algorithm now explicitly named Phase 1+2+3),
making it behave like a combined strategy rather than a strict divisor-only
mode. This made it impossible for users to select a truly conservative
alignment that blocks scaling when no divisor exists in the safe range.
The new design introduces two additional modes to cover the full spectrum of
alignment strictness:
| Mode | Strictness | Search behavior | When to use |
| *EVENLY_SPREAD* (redefined) | Strict | Within-range only (Phase 1). Accepts
only exact divisors of N. Blocks scaling if none found. | Data-sensitive
workloads where uneven distribution is unacceptable (e.g., windowed
aggregations, partitioned state). |
| *OPTIMIZE_RESOURCE_UTILIZATION* (new) | Minimal | Within-range only (Phase
1). Accepts any *p ≤ N* (every subtask has at least one item). | Cost-sensitive
workloads where resource efficiency matters more than even distribution. The
autoscaler's computed target is used almost as-is. |
Together with the existing modes, the full design space is:
| Mode | Alignment strictness | Phases used | Search direction |
| *EVENLY_SPREAD* | Strict (divisor only) | P1 | Within range only |
| *OPTIMIZE_RESOURCE_UTILIZATION* | Minimal (*p ≤ N*) | P1 | Within range only |
| *MAXIMIZE_UTILISATION* | Relaxed (*N/p < N/new*) | P2 + P3 | Upward outside
range + downward fallback |
| *ADAPTIVE_UPWARD_SPREAD* | Combined (divisor → relaxed fallback) | P2 + P3 |
Upward outside range + downward fallback |
h2. 6.5. *ADAPTIVE_UPWARD_SPREAD* mode (rename + new default)
The old *EVENLY_SPREAD* behavior, which combined a divisor search with an
implicit *MAXIMIZE_UTILISATION*-style fallback and a downward relaxed search,
is captured by the new *ADAPTIVE_UPWARD_SPREAD* mode. This name explicitly
conveys that it searches *above* the target and adapts between strict and
relaxed alignment. It is now the *default mode*, preserving backward
compatibility with the old *EVENLY_SPREAD* behavior while giving it a more
accurate name.
h2. 6.6. Composable fallback configuration
With properly separated modes, strict modes like *EVENLY_SPREAD* will block
scaling when no divisor exists in the safe range. Previously, the relaxed
fallback was implicitly baked into *EVENLY_SPREAD* with no way to control it.
The new design introduces a separate fallback configuration:
{code:java}
job.autoscaler.scaling.key-group.partitions.adjust.mode.fallback
{code}
*Why this is needed:*
- *Separation of concerns:* The primary mode defines the *preferred* alignment
strategy; the fallback defines what to do when it fails. Mixing both into a
single enum creates combinatorial explosion and unclear semantics.
- *User control:* Different workloads need different trade-offs. A user might
want strict *EVENLY_SPREAD* alignment as the primary strategy but accept
*OPTIMIZE_RESOURCE_UTILIZATION* as a fallback to avoid blocking scaling
entirely. Without a fallback config, the only options are "strict and block" or
"relaxed from the start."
- *Extensibility:* New modes can be added without creating N × M combined enum
values. Any mode can be paired with any fallback.
*Enum:* *KeyGroupOrPartitionsAdjustFallback* with values:
| Value | Behavior |
| *DEFAULT* | Uses the mode's built-in fallback. Only *ADAPTIVE_UPWARD_SPREAD*
has a built-in fallback (*MAXIMIZE_UTILISATION*). All others resolve to *NONE*.
|
| *NONE* | Blocks scaling when no aligned value is found. |
| *ADAPTIVE_UPWARD_SPREAD* | Delegates to the full *ADAPTIVE_UPWARD_SPREAD*
algorithm as a second pass. |
| *MAXIMIZE_UTILISATION* | Falls back to the relaxed *MAXIMIZE_UTILISATION*
search. |
| *OPTIMIZE_RESOURCE_UTILIZATION* | Falls back to accepting any value where *p
≤ N*. |
The *toMode()* method on the fallback enum maps each value to the corresponding
*KeyGroupOrPartitionsAdjustMode*, enabling the same *applyScaleUpMode* /
*applyScaleDownMode* methods to be reused for both primary and fallback passes
→ no code duplication.
*Example configurations:*
| Use case | Mode | Fallback | Behavior |
| Backward-compatible default | *ADAPTIVE_UPWARD_SPREAD* | *DEFAULT* →
*MAXIMIZE_UTILISATION* | Tries divisor above target, falls back to relaxed
load-decrease search |
| Strict with graceful degradation | *EVENLY_SPREAD* |
*OPTIMIZE_RESOURCE_UTILIZATION* | Tries exact divisor in range; if none,
accepts any *p ≤ N* |
| Strict, block on failure | *EVENLY_SPREAD* | *NONE* | Exact divisor or
nothing — scaling blocked if no divisor in range |
| Maximum resource efficiency | *OPTIMIZE_RESOURCE_UTILIZATION* | *NONE* |
Always accepts the autoscaler's target (as long as *p ≤ N*) |
| Strict with upward fallback | *EVENLY_SPREAD* | *ADAPTIVE_UPWARD_SPREAD* |
Tries divisor in range; if none, searches above target with full adaptive
algorithm |
h2. 6.7. Proposed Scaling Flow
{code:none}
┌──────────────────────────────────┐
│ applyScaleUpMode(primaryMode) │ ← self-contained: Phase 1 + 2 + 3
│ returns result or curP │
└──────────┬───────────────────────┘
│ result == curP?
▼ yes
┌──────────────────────────────────┐
│ applyScaleUpMode(fallbackMode) │ ← same method, different mode
│ returns result or curP │
└──────────┬───────────────────────┘
│ result == curP?
▼ yes
emit SCALING_LIMITED event
return curP
{code}
h2. 6.8. Compliance-aware warning logging
When both the primary mode and fallback fail to find an aligned value, a *WARN*
log distinguishes between two cases:
- *Compliant:* *currentParallelism* is itself aligned with the mode/fallback -
there's simply no *better* aligned value in the scaling direction.
- *Last resort:* *currentParallelism* does *not* comply with the mode or
fallback - it is returned purely to preserve the scaling direction.
This helps operators diagnose whether the autoscaler is stuck at a good value
(just no room to scale) vs stuck at a bad value (alignment constraint is too
restrictive for the current state).
> Inconsistent vertex parallelism alignment logic
> -----------------------------------------------
>
> Key: FLINK-39299
> URL: https://issues.apache.org/jira/browse/FLINK-39299
> Project: Flink
> Issue Type: Bug
> Components: Autoscaler, Kubernetes Operator
> Affects Versions: 1.15.0
> Reporter: Dennis-Mircea Ciupitu
> Priority: Critical
> Labels: autoscaling, operator, pull-request-available
> Fix For: 1.15.0
>
>
> h1. 1. Summary
> The parallelism alignment logic in *JobVertexScaler#scale* (responsible for
> adjusting computed parallelism to align with {*}numKeyGroupsOrPartitions{*})
> contains several correctness issues that can cause scaling decisions to be
> silently cancelled, inverted in direction, or excessively overshooting the
> target. These problems are most impactful for Kafka source vertices with
> partition counts that are prime or have few small divisors (e.g., 3, 5, 7,
> 11, 13), which are very common in production deployments.
> Additionally, the existing two modes ({*}EVENLY_SPREAD{*} and
> {*}MAXIMIZE_UTILISATION{*}) do not offer sufficient granularity.
> {*}EVENLY_SPREAD{*}, despite its name, actually runs both an upward
> outside-range search and a downward relaxed fallback (making it behave like a
> combined mode), while *MAXIMIZE_UTILISATION* is purely upward. There is no
> way to configure a truly strict within-range divisor-only mode, nor a mode
> that simply accepts the computed target as-is when every subtask has work.
> Furthermore, there is no mechanism to compose modes (e.g., try a strict
> alignment first, then fall back to a more relaxed strategy).
> h1. 2. Root Cause
> The alignment logic consists of two loops:
> 1. *First loop (upward search,
> [L554–L564|https://github.com/apache/flink-kubernetes-operator/blob/56af6bf3a94494923497e709faf29fb3749cd45f/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L554]):*
> Starting from {*}newParallelism{*}, searches upward to
> *upperBoundForAlignment* for a value *p* such that *N % p == 0* (or satisfies
> the *MAXIMIZE_UTILISATION* condition).
> 2. *Fallback loop (downward search,
> [L569–L577|https://github.com/apache/flink-kubernetes-operator/blob/56af6bf3a94494923497e709faf29fb3749cd45f/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L570]):*
> If the first loop fails, searches downward from *newParallelism* toward 0
> for the next "partition boundary" where the integer division *N / p*
> increases.
> Neither loop is aware of the *scaling direction* (scale-up vs. scale-down) or
> the {*}current parallelism{*}. This causes the following systematic issues.
> h1. 3. Notation
> Throughout this report:
> - *N* = *numKeyGroupsOrPartitions*
> - *cur* = *currentParallelism*
> - *new* = *newParallelism* (after capping by bounds at L536)
> - *UB* = *upperBoundForAlignment* = *min(N, upperBound)*
> - *q* = *⌊N / new⌋* (integer division)
> - *divisors(N, [a, b])* = the set of divisors of N in the range [a, b]
> When the first loop fails, the fallback loop result is computed as:
> {code}
> q = ⌊N / new⌋
> p_raw = ⌊N / (q + 1)⌋ // largest p where ⌊N/p⌋ jumps above q
> result = (N % p_raw ≠ 0) ? p_raw + 1 : p_raw
> result = max(result, parallelismLowerLimit)
> {code}
> h1. 4. Problem Cases
> h2. 4.1. Scale-Up (*cur < new*)
> h3. 4.1.1. Case 1: Scale-up completely cancelled (returns *cur*)
> *General formula:*
> {code}
> Preconditions:
> 1. cur < new // scale-up intent
> 2. divisors(N, [new, UB]) = ∅ // first loop fails
> 3. fallback_result == cur // fallback lands on current
> Result: scale() returns cur → computeScaleTargetParallelism sees
> new == cur → noChange(). Scale-up silently dropped.
> {code}
> This is *not* restricted to *cur < N/2*. It applies whenever the fallback
> rounding lands exactly on *cur*, which depends on the divisor structure of N
> relative to *new*.
> *Concrete example:*
> || Parameter || Value ||
> | currentParallelism | 1 |
> | newParallelism (after capping) | 2 |
> | numSourcePartitions (N) | 3 |
> | bounds \[lower, upper\] | \[1, 2\] |
> - *q = ⌊3/2⌋ = 1*, *p_raw = ⌊3/2⌋ = 1*, *3 % 1 = 0* → result = *1* = cur →
> *noChange*.
> - A Kafka source with 3 partitions at parallelism 2 is perfectly valid.
> Another example (showing *cur > N/2* is also affected):
> - cur=3, new=4, N=5, bounds \[1, 4\]
> - *q = ⌊5/4⌋ = 1*, *p_raw = ⌊5/2⌋ = 2*, *5 % 2 ≠ 0* → result = *3* = cur →
> *noChange*.
> h3. 4.1.2. Case 2: Scale-up inverted into scale-down
> *General formula:*
> {code}
> Preconditions:
> 1. cur < new // scale-up intent
> 2. divisors(N, [new, UB]) = ∅ // first loop fails
> 3. fallback_result < cur // fallback drops BELOW current
> Result: scale() returns fallback_result < cur →
> detectBlockScaling sees scaledUp = (cur < fallback_result) = false
> → enters the SCALE-DOWN path. Direction completely inverted.
> {code}
> This is particularly likely when *cur > ⌊N/2⌋*, because divisors of N tend to
> cluster at the low end (*≤ N/2*) and at N itself. When UB prevents reaching
> N, the fallback jumps to a divisor well below *cur*.
> *Concrete example:*
> || Parameter || Value ||
> | currentParallelism | 6 |
> | newParallelism (after capping) | 8 |
> | numSourcePartitions (N) | 10 |
> | bounds \[lower, upper\] | \[1, 8\] |
> - First loop: *p=8 → 10%8=2≠0* → no match.
> - *q = ⌊10/8⌋ = 1*, *p_raw = ⌊10/2⌋ = 5*, *10 % 5 = 0* → result = *5*.
> - *5 < cur(6)* → *detectBlockScaling*: *scaledUp = (6 < 5) = false* →
> *scale-down path*.
> h3. 4.1.3. Case 3: *MAXIMIZE_UTILISATION* mode does not resolve Cases 1 and 2
> *General formula:*
> {code}
> The MAXIMIZE_UTILISATION condition in the first loop (L559-L561) is: [N / p]
> < [N / new]
> This has two structural flaws:
> A) Self-comparison: At p = new (first iteration), this is x < x → always
> FALSE.
> MAXIMIZE_UTILISATION can never accept newParallelism itself.
> B) Integer division plateau: When new > ⌊N/2⌋, then ⌊N/p⌋ = 1 for ALL p
> in [⌈N/2⌉, N-1]. The condition becomes 1 < 1 → FALSE across the entire
> search range. MAXIMIZE_UTILISATION becomes equivalent to EVENLY_SPREAD.
> Therefore: All of Cases 1-2 reproduce identically under MAXIMIZE_UTILISATION
> whenever new > ⌊N/2⌋.
> {code}
> Concrete example (Case 1 under MAXIMIZE\_UTILISATION):
> - cur=1, new=2, N=3, bounds \[1, 2\]:
> - *p=2 → 3%2≠0*. MAXIMIZE\_UTILISATION: *⌊3/2⌋=1 < ⌊3/2⌋=1? NO*.
> - Falls through to same fallback → returns *1* → *noChange*.
> h3. 4.1.4. Case 4: Greedy overshoot
> *General formula:*
> {code}
> Preconditions:
> 1. cur < new // scale-up intent
> 2. divisors(N, [new, UB]) ≠ ∅ // first loop succeeds
> 3. Let d = min(divisors(N, [new, UB])) // smallest divisor found
> 4. d >> new // significant overshoot
> Under EVENLY_SPREAD: d is the smallest divisor of N ≥ new within UB.
> Under MAXIMIZE_UTILISATION: d is the smallest p ≥ new where
> N%p==0 OR ⌊N/p⌋ < ⌊N/new⌋. But due to the integer division plateau
> (when new > ⌊N/2⌋, all quotients = 1), MAXIMIZE_UTILISATION finds
> the same d as EVENLY_SPREAD — directly contradicting its purpose of
> minimizing resource usage.
> Result: scale() returns d > new, wasting (d - new) subtasks of resources.
> {code}
> *Concrete example:*
> || Parameter || Value ||
> | currentParallelism | 5 |
> | newParallelism (after capping) | 8 |
> | numSourcePartitions (N) | 10 |
> | bounds \[lower, upper\] | \[5, 10\] |
> - First loop: *p=8 (10%8≠0), p=9 (10%9≠0), p=10 (10%10=0)* → result = *10*
> (25% overshoot).
> - With MAXIMIZE\_UTILISATION: *⌊10/8⌋=⌊10/9⌋=⌊10/10⌋=1*, plateau → same
> result = *10*.
> - Parallelism 8 for a 10-partition Kafka source is perfectly valid (8
> subtasks, 2 get 2 partitions, 6 get 1).
> h2. 4.2. Scale-Down (*cur > new*)
> h3. Case 1: Scale-down completely cancelled (returns *cur*)
> *General formula:*
> {code}
> Preconditions:
> 1. cur > new // scale-down intent
> 2. divisors(N, [new, cur-1]) = ∅ // no divisor between new and cur
> 3. N % cur == 0 // cur itself is a divisor
> 4. cur ≤ UB // first loop reaches cur
> Result: First loop finds cur as the first divisor ≥ new.
> scale() returns cur → computeScaleTargetParallelism sees
> new == cur → noChange(). Scale-down silently dropped.
> {code}
> *Concrete example:*
> || Parameter || Value ||
> | currentParallelism | 5 |
> | newParallelism (after capping) | 4 |
> | numSourcePartitions (N) | 10 |
> | bounds \[lower, upper\] | \[1, 10\] |
> - First loop: *p=4 → 10%4=2≠0*, *p=5 → 10%5=0* → result = *5* = cur →
> *noChange*.
> Another example (showing *new > N/2* is also affected):
> - cur=9, new=7, N=9, bounds \[1, 9\]
> - First loop: *p=7 (9%7≠0), p=8 (9%8≠0), p=9 (9%9=0)* → result = *9* = cur →
> *noChange*.
> h3. Case 2: Scale-down inverted into scale-up
> *General formula:*
> {code}
> Preconditions:
> 1. cur > new // scale-down intent
> 2. divisors(N, [new, cur]) = ∅ // no divisor at or below cur
> 3. Let d = min(divisors(N, [new, UB])) // first divisor found
> 4. d > cur // overshoots past current
> Result: scale() returns d > cur →
> detectBlockScaling sees scaledUp = (cur < d) = true
> → enters the SCALE-UP path. Direction completely inverted.
> This is particularly likely when new > ⌊N/2⌋, because the only divisor
> of N in [⌈N/2⌉, N] is typically N itself (unless N is even, in which
> case N/2 is also a divisor). So the first loop jumps all the way to N.
> {code}
> *Concrete example:*
> || Parameter || Value ||
> | currentParallelism | 8 |
> | newParallelism (after capping) | 6 |
> | numSourcePartitions (N) | 10 |
> | bounds \[lower, upper\] | \[6, 10\] |
> - First loop: *p=6 (10%6≠0), p=7 (10%7≠0), p=8 (10%8≠0), p=9 (10%9≠0), p=10
> (10%10=0)* → result = *10*.
> - *10 > cur(8)* → *scaledUp = true* → *scale-up path*.
> h3. Case 3: *MAXIMIZE_UTILISATION* does not resolve Cases 5 and 6
> *General formula:*
> {code}
> Same structural issues as Case 3:
> For Case 5: The first loop finds cur because N%cur==0, which triggers
> the modulo check before MAXIMIZE_UTILISATION is even evaluated.
> The mode is irrelevant.
> For Case 6: When new > ⌊N/2⌋, the integer division plateau means
> ⌊N/p⌋ = ⌊N/new⌋ = 1 for all p in [new, N-1]. MAXIMIZE_UTILISATION
> cannot differentiate any candidate and falls through to the same
> divisor (N) as EVENLY_SPREAD.
> {code}
> h2. 4.3. Cross-Cutting: Spurious *ScalingLimited* Warning Events
> *General formula:*
> {code}
> Preconditions:
> 1. divisors(N, [new, UB]) = ∅ // first loop fails
> 2. fallback_result == new // rounding brings result back to
> new
> This happens when:
> p_raw = ⌊N / (q + 1)⌋
> N % p_raw ≠ 0 → result = p_raw + 1 == new
> Result: ScalingLimited event is emitted with "expected: X, actual: X"
> → a completely misleading warning that pollutes the event log, even
> though the scaling proceeds correctly.
> {code}
> *Concrete example:*
> - cur=2, new=3, N=7, bounds \[1, 5\]
> - First loop: *p=3 (7%3≠0), p=4 (7%4≠0), p=5 (7%5≠0)* → fails.
> - *q=⌊7/3⌋=2*, *p_raw=⌊7/3⌋=2*, *7%2≠0* → result = *3* = new.
> - ScalingLimited warning: "expected: 3, actual: 3" ← false alarm.
> h1. 5. Impact
> - *Systematic, not isolated:* The formulas above show these issues are
> determined by the divisor structure of N relative to *new*, *cur*, and *UB*.
> Any partition count with sparse divisors triggers them which includes all
> primes and most semi-primes, which are extremely common in production Kafka
> deployments.
> - Combined with tight *VERTEX_MAX_PARALLELISM* settings, the alignment logic
> can render the autoscaler completely unable to scale a vertex.
> - Direction inversion (Cases 2 and 6) can cause resource oscillation and
> instability.
> - *MAXIMIZE_UTILISATION* mode fails to provide any benefit in the most
> problematic scenarios (when *new > ⌊N/2⌋*), despite being specifically
> designed for these use cases.
> - Spurious *ScalingLimited* events reduce signal-to-noise ratio in monitoring.
> h1. 6. Proposed Solution
> The new proposed solution covers:
> - *Direction safety by construction* - separate scale-up/scale-down paths
> with structural guarantees.
> - *Single-responsibility modes* - the old combined EVENLY_SPREAD is
> decomposed into distinct building blocks.
> - *Composable mode + fallback* - any mode paired with any fallback, no
> combinatorial explosion.
> - *Open for extension* - new modes (e.g., *ADAPTIVE_DOWNWARD_SPREAD*,
> NEAREST_SPREAD*) only need to implement the
> *searchesWithinRange*/*allowsOutsideRange* predicates and an *isAligned*
> branch; new fallbacks only need a *toMode()* mapping. The orchestration logic
> in *scaleUp()*/*scaleDown()* and the fallback framework remain untouched.
> h2. 6.1. Direction-aware *scaleUp()* / *scaleDown()* entry points
> Split the single alignment code path into two direction-aware methods:
> - *scaleUp()* → guarantees the result is strictly *above*
> *currentParallelism*, or returns *currentParallelism* (sentinel = "no aligned
> value found").
> - *scaleDown()* → guarantees the result is strictly *below*
> *currentParallelism*, or returns *currentParallelism*.
> The *scale()* method dispatches to the appropriate one based on
> *newParallelism > currentParallelism* vs *newParallelism <
> currentParallelism*.
> h2. 6.2. Three-phase alignment algorithm per mode
> Each mode's algorithm is structured into three phases (first match wins):
> | Phase | Description | Active for |
> | *Phase 1* (within range) | Divisor search between *currentParallelism* and
> *newParallelism*. Scale-up searches downward from target; scale-down searches
> upward. | *EVENLY_SPREAD*, *OPTIMIZE_RESOURCE_UTILIZATION* |
> | *Phase 2* (upward outside range) | Searches upward from *newParallelism* to
> *upper align limit*. Accepts exact divisors; *MAXIMIZE_UTILISATION*
> additionally accepts values where per-subtask load decreases. In scale-down,
> capped at *min(currentParallelism, upper align limit)* to prevent crossing. |
> *MAXIMIZE_UTILISATION*, *ADAPTIVE_UPWARD_SPREAD* |
> | *Phase 3* (relaxed downward fallback) | Searches downward from
> *newParallelism* for the boundary where per-subtask load increases, then
> snaps up to the nearest divisor-aligned value. In scale-up, a *direction
> guard* rejects results ≤ *currentParallelism*. | *MAXIMIZE_UTILISATION*,
> *ADAPTIVE_UPWARD_SPREAD* |
> Phase 2 and Phase 3 share the same `allowsOutsideRange(mode)` guard and
> identical logic between scale-up and scale-down. They are extracted into a
> single shared method *applyOutsideRangeSearch()*, parameterized by:
> - `phase2UpperBound` → `upper align limit` for scale-up;
> `min(currentParallelism, upper align limit)` for scale-down.
> - `isScaleUp` → enables the direction guard in Phase 3.
> h2. 6.3. Search ranges diagram
> {code:none}
> PARALLELISM ALIGNMENT — SEARCH RANGES PER MODE
> ══════════════════════════════════════════════════════════════════════════════════════
> Complete number line with all key barriers:
> lower align new ↓ current new ↑ upper align
> limit¹ target² parallelism target³ limit⁴
> │ │ │ │ │
> ────┴───────────────┴───────────────┴───────────────┴───────────────┴──►
> │ │ │ │ │
> │ ◄──── scale-down zone ─────► │ ◄────── scale-up zone ─────► │
> ¹ parallelismLowerLimit — floor for alignment results
> ² newParallelism when scaling down
> ³ newParallelism when scaling up
> ⁴ upperBoundForAlignment = min(N, min(maxParallelism,
> parallelismUpperLimit))
> N = numKeyGroupsOrPartitions (key groups or source partitions)
> Legend: ◄ = downward search direction ► = upward search direction
> ├──────┤ = search range P1/P2/P3 = phase (first match
> wins)
> ═══════════════════════════════════════════════════════════════════════════════════════════
> SCALE-UP (newParallelism > currentParallelism)
> ═══════════════════════════════════════════════════════════════════════════════════════════
> currentP new(↑) upper
> align
> limit
> │ │ │
> ─────┼─────────────────┼──────────────┼──►
> EVENLY_SPREAD P1: ├────────◄────────┤
> N % p == 0
> OPTIMIZE_RESOURCE_ P1: ├────────◄────────┤
> p ≤ N
> UTILIZATION
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
> · · · · ·
> MAXIMIZE_UTILISATION P2: ├────────►─────┤
> N%p==0 ∨ N/p<N/new
> P3: ├────────◄────────┤
> snap to load boundary
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
> · · · · ·
> ADAPTIVE_UPWARD_ P2: ├────────►─────┤
> N % p == 0
> SPREAD P3: ├────────◄────────┤
> snap to load boundary
> P3 direction guard: result must be > currentP,
> otherwise discarded
> ═══════════════════════════════════════════════════════════════════════════════════════════
> SCALE-DOWN (newParallelism < currentParallelism)
> ═══════════════════════════════════════════════════════════════════════════════════════════
> lower align new(↓)
> currentP
> limit
> │ │ │
>
> ─────┼────────────────┼────────────────────┼──►
> EVENLY_SPREAD P1:
> ├────────►───────────┤ N % p == 0
> OPTIMIZE_RESOURCE_ P1:
> ├────────►───────────┤ p ≤ N
> UTILIZATION
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
> · · · · ·
> MAXIMIZE_UTILISATION P2:
> ├────────►───────────┤⁵ N%p==0 ∨ N/p<N/new
> P3: ├───────◄────────┤
> snap to load boundary
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
> · · · · ·
> ADAPTIVE_UPWARD_ P2:
> ├────────►───────────┤⁵ N % p == 0
> SPREAD P3: ├───────◄────────┤
> snap to load boundary
> ⁵ Phase 2 upper bound capped at min(currentP, upper align limit) to prevent
> direction inversion
> Phase 3 results are inherently ≤ new(↓) < currentP → no direction guard
> needed
> ═══════════════════════════════════════════════════════════════════════════════════════════
> MODE-THEN-FALLBACK FLOW (applies to both scale-up and scale-down)
> ═══════════════════════════════════════════════════════════════════════════════════════════
> ┌─────────────────────┐ found? ┌──────────┐
> │ Run primary mode's ├──── YES ──────►│ Return │
> │ full P1 → P2 → P3 │ │ result │
> └─────────┬───────────┘ └──────────┘
> │ NO (sentinel)
> ▼
> ┌─────────────────────┐ found? ┌──────────┐
> │ Run fallback mode's ├──── YES ──────►│ Return │
> │ full P1 → P2 → P3 │ │ result │
> └─────────┬───────────┘ └──────────┘
> │ NO (sentinel)
> ▼
> ┌─────────────────────────────────────────────────┐
> │ WARN log (compliant vs last-resort check) │
> │ Emit SCALING_LIMITED event │
> │ Return currentParallelism (scaling blocked) │
> └─────────────────────────────────────────────────┘
> {code}
> h2. 6.4. New modes: *EVENLY_SPREAD* (redefined) and
> *OPTIMIZE_RESOURCE_UTILIZATION*
> The original implementation offered only two modes (*EVENLY_SPREAD* and
> *MAXIMIZE_UTILISATION*), but *EVENLY_SPREAD* was misleadingly named, it
> actually ran both an upward outside-range search *and* a downward relaxed
> fallback (the same three-phase algorithm now explicitly named Phase 1+2+3),
> making it behave like a combined strategy rather than a strict divisor-only
> mode. This made it impossible for users to select a truly conservative
> alignment that blocks scaling when no divisor exists in the safe range.
> The new design introduces two additional modes to cover the full spectrum of
> alignment strictness:
> | Mode | Strictness | Search behavior | When to use |
> | *EVENLY_SPREAD* (redefined) | Strict | Within-range only (Phase 1). Accepts
> only exact divisors of N. Blocks scaling if none found. | Data-sensitive
> workloads where uneven distribution is unacceptable (e.g., windowed
> aggregations, partitioned state). |
> | *OPTIMIZE_RESOURCE_UTILIZATION* (new) | Minimal | Within-range only (Phase
> 1). Accepts any *p ≤ N* (every subtask has at least one item). |
> Cost-sensitive workloads where resource efficiency matters more than even
> distribution. The autoscaler's computed target is used almost as-is. |
> Together with the existing modes, the full design space is:
> | Mode | Alignment strictness | Phases used | Search direction |
> | *EVENLY_SPREAD* | Strict (divisor only) | P1 | Within range only |
> | *OPTIMIZE_RESOURCE_UTILIZATION* | Minimal (*p ≤ N*) | P1 | Within range
> only |
> | *MAXIMIZE_UTILISATION* | Relaxed (*N/p < N/new*) | P2 + P3 | Upward outside
> range + downward fallback |
> | *ADAPTIVE_UPWARD_SPREAD* | Combined (divisor → relaxed fallback) | P2 + P3
> | Upward outside range + downward fallback |
> h2. 6.5. *ADAPTIVE_UPWARD_SPREAD* mode (rename + new default)
> The old *EVENLY_SPREAD* behavior, which combined a divisor search with an
> implicit *MAXIMIZE_UTILISATION*-style fallback and a downward relaxed search,
> is captured by the new *ADAPTIVE_UPWARD_SPREAD* mode. This name explicitly
> conveys that it searches *above* the target and adapts between strict and
> relaxed alignment. It is now the *default mode*, preserving backward
> compatibility with the old *EVENLY_SPREAD* behavior while giving it a more
> accurate name.
> h2. 6.6. Composable fallback configuration
> With properly separated modes, strict modes like *EVENLY_SPREAD* will block
> scaling when no divisor exists in the safe range. Previously, the relaxed
> fallback was implicitly baked into *EVENLY_SPREAD* with no way to control it.
> The new design introduces a separate fallback configuration:
> {code:java}
> job.autoscaler.scaling.key-group.partitions.adjust.mode.fallback
> {code}
> *Why this is needed:*
> - *Separation of concerns:* The primary mode defines the *preferred*
> alignment strategy; the fallback defines what to do when it fails. Mixing
> both into a single enum creates combinatorial explosion and unclear semantics.
> - *User control:* Different workloads need different trade-offs. A user might
> want strict *EVENLY_SPREAD* alignment as the primary strategy but accept
> *OPTIMIZE_RESOURCE_UTILIZATION* as a fallback to avoid blocking scaling
> entirely. Without a fallback config, the only options are "strict and block"
> or "relaxed from the start."
> - *Extensibility:* New modes can be added without creating N × M combined
> enum values. Any mode can be paired with any fallback.
> *Enum:* *KeyGroupOrPartitionsAdjustFallback* with values:
> | Value | Behavior |
> | *DEFAULT* | Uses the mode's built-in fallback. Only
> *ADAPTIVE_UPWARD_SPREAD* has a built-in fallback (*MAXIMIZE_UTILISATION*).
> All others resolve to *NONE*. |
> | *NONE* | Blocks scaling when no aligned value is found. |
> | *ADAPTIVE_UPWARD_SPREAD* | Delegates to the full *ADAPTIVE_UPWARD_SPREAD*
> algorithm as a second pass. |
> | *MAXIMIZE_UTILISATION* | Falls back to the relaxed *MAXIMIZE_UTILISATION*
> search. |
> | *OPTIMIZE_RESOURCE_UTILIZATION* | Falls back to accepting any value where
> *p ≤ N*. |
> The *toMode()* method on the fallback enum maps each value to the
> corresponding *KeyGroupOrPartitionsAdjustMode*, enabling the same
> *applyScaleUpMode* / *applyScaleDownMode* methods to be reused for both
> primary and fallback passes → no code duplication.
> *Example configurations:*
> | Use case | Mode | Fallback | Behavior |
> | Backward-compatible default | *ADAPTIVE_UPWARD_SPREAD* | *DEFAULT* →
> *MAXIMIZE_UTILISATION* | Tries divisor above target, falls back to relaxed
> load-decrease search |
> | Strict with graceful degradation | *EVENLY_SPREAD* |
> *OPTIMIZE_RESOURCE_UTILIZATION* | Tries exact divisor in range; if none,
> accepts any *p ≤ N* |
> | Strict, block on failure | *EVENLY_SPREAD* | *NONE* | Exact divisor or
> nothing — scaling blocked if no divisor in range |
> | Maximum resource efficiency | *OPTIMIZE_RESOURCE_UTILIZATION* | *NONE* |
> Always accepts the autoscaler's target (as long as *p ≤ N*) |
> | Strict with upward fallback | *EVENLY_SPREAD* | *ADAPTIVE_UPWARD_SPREAD* |
> Tries divisor in range; if none, searches above target with full adaptive
> algorithm |
> h2. 6.7. Proposed Scaling Flow
> {code:none}
> ┌──────────────────────────────────┐
> │ applyScaleUpMode(primaryMode) │ ← self-contained: Phase 1 + 2 + 3
> │ returns result or curP │
> └──────────┬───────────────────────┘
> │ result == curP?
> ▼ yes
> ┌──────────────────────────────────┐
> │ applyScaleUpMode(fallbackMode) │ ← same method, different mode
> │ returns result or curP │
> └──────────┬───────────────────────┘
> │ result == curP?
> ▼ yes
> emit SCALING_LIMITED event
> return curP
> {code}
> h2. 6.8. Compliance-aware warning logging
> When both the primary mode and fallback fail to find an aligned value, a
> *WARN* log distinguishes between two cases:
> - *Compliant:* *currentParallelism* is itself aligned with the mode/fallback
> - there's simply no *better* aligned value in the scaling direction.
> - *Last resort:* *currentParallelism* does *not* comply with the mode or
> fallback - it is returned purely to preserve the scaling direction.
> This helps operators diagnose whether the autoscaler is stuck at a good value
> (just no room to scale) vs stuck at a bad value (alignment constraint is too
> restrictive for the current state).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)