[ 
https://issues.apache.org/jira/browse/FLINK-39299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dennis-Mircea Ciupitu updated FLINK-39299:
------------------------------------------
    Affects Version/s: 1.15.4
                           (was: 1.15.0)

> Inconsistent vertex parallelism alignment logic
> -----------------------------------------------
>
>                 Key: FLINK-39299
>                 URL: https://issues.apache.org/jira/browse/FLINK-39299
>             Project: Flink
>          Issue Type: Bug
>          Components: Autoscaler, Kubernetes Operator
>    Affects Versions: 1.15.4
>            Reporter: Dennis-Mircea Ciupitu
>            Priority: Critical
>              Labels: autoscaling, operator, pull-request-available
>             Fix For: 1.15.0
>
>
> h1. 1. Summary
> The parallelism alignment logic in *JobVertexScaler#scale* (responsible for 
> adjusting computed parallelism to align with {*}numKeyGroupsOrPartitions{*}) 
> contains several correctness issues that can cause scaling decisions to be 
> silently cancelled, inverted in direction, or excessively overshooting the 
> target. These problems are most impactful for Kafka source vertices with 
> partition counts that are prime or have few small divisors (e.g., 3, 5, 7, 
> 11, 13), which are very common in production deployments.
> Additionally, the existing two modes ({*}EVENLY_SPREAD{*} and 
> {*}MAXIMIZE_UTILISATION{*}) do not offer sufficient granularity. 
> {*}EVENLY_SPREAD{*}, despite its name, actually runs both an upward 
> outside-range search and a downward relaxed fallback (making it behave like a 
> combined mode), while *MAXIMIZE_UTILISATION* is purely upward. There is no 
> way to configure a truly strict within-range divisor-only mode, nor a mode 
> that simply accepts the computed target as-is when every subtask has work. 
> Furthermore, there is no mechanism to compose modes (e.g., try a strict 
> alignment first, then fall back to a more relaxed strategy).
> h1. 2. Root Cause
> The alignment logic consists of two loops:
> 1. *First loop (upward search, 
> [L554–L564|https://github.com/apache/flink-kubernetes-operator/blob/56af6bf3a94494923497e709faf29fb3749cd45f/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L554]):*
>  Starting from {*}newParallelism{*}, searches upward to 
> *upperBoundForAlignment* for a value *p* such that *N % p == 0* (or satisfies 
> the *MAXIMIZE_UTILISATION* condition).
> 2. *Fallback loop (downward search, 
> [L569–L577|https://github.com/apache/flink-kubernetes-operator/blob/56af6bf3a94494923497e709faf29fb3749cd45f/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L570]):*
>  If the first loop fails, searches downward from *newParallelism* toward 0 
> for the next "partition boundary" where the integer division *N / p* 
> increases.
> Neither loop is aware of the *scaling direction* (scale-up vs. scale-down) or 
> the {*}current parallelism{*}. This causes the following systematic issues.
> h1. 3. Notation
> Throughout this report:
> - *N* = *numKeyGroupsOrPartitions*
> - *cur* = *currentParallelism*
> - *new* = *newParallelism* (after capping by bounds at L536)
> - *UB* = *upperBoundForAlignment* = *min(N, upperBound)*
> - *q* = *⌊N / new⌋* (integer division)
> - *divisors(N, [a, b])* = the set of divisors of N in the range [a, b]
> When the first loop fails, the fallback loop result is computed as:
> {code}
> q      = ⌊N / new⌋
> p_raw  = ⌊N / (q + 1)⌋          // largest p where ⌊N/p⌋ jumps above q
> result = (N % p_raw ≠ 0) ? p_raw + 1 : p_raw
> result = max(result, parallelismLowerLimit)
> {code}
> h1. 4. Problem Cases
> h2. 4.1. Scale-Up (*cur < new*)
> h3. 4.1.1. Case 1: Scale-up completely cancelled (returns *cur*)
> *General formula:*
> {code}
> Preconditions:
>   1. cur < new                              // scale-up intent
>   2. divisors(N, [new, UB]) = ∅             // first loop fails
>   3. fallback_result == cur                 // fallback lands on current
> Result: scale() returns cur → computeScaleTargetParallelism sees
>   new == cur → noChange(). Scale-up silently dropped.
> {code}
> This is *not* restricted to *cur < N/2*. It applies whenever the fallback 
> rounding lands exactly on *cur*, which depends on the divisor structure of N 
> relative to *new*.
> *Concrete example:*
> || Parameter || Value ||
> | currentParallelism | 1 |
> | newParallelism (after capping) | 2 |
> | numSourcePartitions (N) | 3 |
> | bounds \[lower, upper\] | \[1, 2\] |
> - *q = ⌊3/2⌋ = 1*, *p_raw = ⌊3/2⌋ = 1*, *3 % 1 = 0* → result = *1* = cur → 
> *noChange*.
> - A Kafka source with 3 partitions at parallelism 2 is perfectly valid.
> Another example (showing *cur > N/2* is also affected):
> - cur=3, new=4, N=5, bounds \[1, 4\]
> - *q = ⌊5/4⌋ = 1*, *p_raw = ⌊5/2⌋ = 2*, *5 % 2 ≠ 0* → result = *3* = cur → 
> *noChange*.
> h3. 4.1.2. Case 2: Scale-up inverted into scale-down
> *General formula:*
> {code}
> Preconditions:
>   1. cur < new                             // scale-up intent
>   2. divisors(N, [new, UB]) = ∅            // first loop fails
>   3. fallback_result < cur                 // fallback drops BELOW current
> Result: scale() returns fallback_result < cur →
>   detectBlockScaling sees scaledUp = (cur < fallback_result) = false
>   → enters the SCALE-DOWN path. Direction completely inverted.
> {code}
> This is particularly likely when *cur > ⌊N/2⌋*, because divisors of N tend to 
> cluster at the low end (*≤ N/2*) and at N itself. When UB prevents reaching 
> N, the fallback jumps to a divisor well below *cur*.
> *Concrete example:*
> || Parameter || Value ||
> | currentParallelism | 6 |
> | newParallelism (after capping) | 8 |
> | numSourcePartitions (N) | 10 |
> | bounds \[lower, upper\] | \[1, 8\] |
> - First loop: *p=8 → 10%8=2≠0* → no match.
> - *q = ⌊10/8⌋ = 1*, *p_raw = ⌊10/2⌋ = 5*, *10 % 5 = 0* → result = *5*.
> - *5 < cur(6)* → *detectBlockScaling*: *scaledUp = (6 < 5) = false* → 
> *scale-down path*.
> h3. 4.1.3. Case 3: *MAXIMIZE_UTILISATION* mode does not resolve Cases 1 and 2
> *General formula:*
> {code}
> The MAXIMIZE_UTILISATION condition in the first loop (L559-L561) is: [N / p] 
> < [N / new]
> This has two structural flaws:
> A) Self-comparison: At p = new (first iteration), this is x < x → always 
> FALSE.
>    MAXIMIZE_UTILISATION can never accept newParallelism itself.
> B) Integer division plateau: When new > ⌊N/2⌋, then ⌊N/p⌋ = 1 for ALL p
>    in [⌈N/2⌉, N-1]. The condition becomes 1 < 1 → FALSE across the entire
>    search range. MAXIMIZE_UTILISATION becomes equivalent to EVENLY_SPREAD.
> Therefore: All of Cases 1-2 reproduce identically under MAXIMIZE_UTILISATION
> whenever new > ⌊N/2⌋.
> {code}
> Concrete example (Case 1 under MAXIMIZE\_UTILISATION):
> - cur=1, new=2, N=3, bounds \[1, 2\]:
> - *p=2 → 3%2≠0*. MAXIMIZE\_UTILISATION: *⌊3/2⌋=1 < ⌊3/2⌋=1? NO*.
> - Falls through to same fallback → returns *1* → *noChange*.
> h3. 4.1.4. Case 4: Greedy overshoot
> *General formula:*
> {code}
> Preconditions:
>   1. cur < new                             // scale-up intent
>   2. divisors(N, [new, UB]) ≠ ∅            // first loop succeeds
>   3. Let d = min(divisors(N, [new, UB]))   // smallest divisor found
>   4. d >> new                              // significant overshoot
> Under EVENLY_SPREAD: d is the smallest divisor of N ≥ new within UB.
> Under MAXIMIZE_UTILISATION: d is the smallest p ≥ new where
>   N%p==0 OR ⌊N/p⌋ < ⌊N/new⌋. But due to the integer division plateau
>   (when new > ⌊N/2⌋, all quotients = 1), MAXIMIZE_UTILISATION finds
>   the same d as EVENLY_SPREAD — directly contradicting its purpose of
>   minimizing resource usage.
> Result: scale() returns d > new, wasting (d - new) subtasks of resources.
> {code}
> *Concrete example:*
> || Parameter || Value ||
> | currentParallelism | 5 |
> | newParallelism (after capping) | 8 |
> | numSourcePartitions (N) | 10 |
> | bounds \[lower, upper\] | \[5, 10\] |
> - First loop: *p=8 (10%8≠0), p=9 (10%9≠0), p=10 (10%10=0)* → result = *10* 
> (25% overshoot).
> - With MAXIMIZE\_UTILISATION: *⌊10/8⌋=⌊10/9⌋=⌊10/10⌋=1*, plateau → same 
> result = *10*.
> - Parallelism 8 for a 10-partition Kafka source is perfectly valid (8 
> subtasks, 2 get 2 partitions, 6 get 1).
> h2. 4.2. Scale-Down (*cur > new*)
> h3. Case 1: Scale-down completely cancelled (returns *cur*)
> *General formula:*
> {code}
> Preconditions:
>   1. cur > new                             // scale-down intent
>   2. divisors(N, [new, cur-1]) = ∅         // no divisor between new and cur
>   3. N % cur == 0                          // cur itself is a divisor
>   4. cur ≤ UB                              // first loop reaches cur
> Result: First loop finds cur as the first divisor ≥ new.
>   scale() returns cur → computeScaleTargetParallelism sees
>   new == cur → noChange(). Scale-down silently dropped.
> {code}
> *Concrete example:*
> || Parameter || Value ||
> | currentParallelism | 5 |
> | newParallelism (after capping) | 4 |
> | numSourcePartitions (N) | 10 |
> | bounds \[lower, upper\] | \[1, 10\] |
> - First loop: *p=4 → 10%4=2≠0*, *p=5 → 10%5=0* → result = *5* = cur → 
> *noChange*.
> Another example (showing *new > N/2* is also affected):
> - cur=9, new=7, N=9, bounds \[1, 9\]
> - First loop: *p=7 (9%7≠0), p=8 (9%8≠0), p=9 (9%9=0)* → result = *9* = cur → 
> *noChange*.
> h3. Case 2: Scale-down inverted into scale-up
> *General formula:*
> {code}
> Preconditions:
>   1. cur > new                             // scale-down intent
>   2. divisors(N, [new, cur]) = ∅           // no divisor at or below cur
>   3. Let d = min(divisors(N, [new, UB]))   // first divisor found
>   4. d > cur                               // overshoots past current
> Result: scale() returns d > cur →
>   detectBlockScaling sees scaledUp = (cur < d) = true
>   → enters the SCALE-UP path. Direction completely inverted.
> This is particularly likely when new > ⌊N/2⌋, because the only divisor
> of N in [⌈N/2⌉, N] is typically N itself (unless N is even, in which
> case N/2 is also a divisor). So the first loop jumps all the way to N.
> {code}
> *Concrete example:*
> || Parameter || Value ||
> | currentParallelism | 8 |
> | newParallelism (after capping) | 6 |
> | numSourcePartitions (N) | 10 |
> | bounds \[lower, upper\] | \[6, 10\] |
> - First loop: *p=6 (10%6≠0), p=7 (10%7≠0), p=8 (10%8≠0), p=9 (10%9≠0), p=10 
> (10%10=0)* → result = *10*.
> - *10 > cur(8)* → *scaledUp = true* → *scale-up path*.
> h3. Case 3: *MAXIMIZE_UTILISATION* does not resolve Cases 5 and 6
> *General formula:*
> {code}
> Same structural issues as Case 3:
> For Case 5: The first loop finds cur because N%cur==0, which triggers
>   the modulo check before MAXIMIZE_UTILISATION is even evaluated.
>   The mode is irrelevant.
> For Case 6: When new > ⌊N/2⌋, the integer division plateau means
>   ⌊N/p⌋ = ⌊N/new⌋ = 1 for all p in [new, N-1]. MAXIMIZE_UTILISATION
>   cannot differentiate any candidate and falls through to the same
>   divisor (N) as EVENLY_SPREAD.
> {code}
> h2. 4.3. Cross-Cutting: Spurious *ScalingLimited* Warning Events
> *General formula:*
> {code}
> Preconditions:
>   1. divisors(N, [new, UB]) = ∅            // first loop fails
>   2. fallback_result == new                // rounding brings result back to 
> new
> This happens when:
>   p_raw = ⌊N / (q + 1)⌋
>   N % p_raw ≠ 0 → result = p_raw + 1 == new
> Result: ScalingLimited event is emitted with "expected: X, actual: X"
>   → a completely misleading warning that pollutes the event log, even
>   though the scaling proceeds correctly.
> {code}
> *Concrete example:*
> - cur=2, new=3, N=7, bounds \[1, 5\]
> - First loop: *p=3 (7%3≠0), p=4 (7%4≠0), p=5 (7%5≠0)* → fails.
> - *q=⌊7/3⌋=2*, *p_raw=⌊7/3⌋=2*, *7%2≠0* → result = *3* = new.
> - ScalingLimited warning: "expected: 3, actual: 3" ← false alarm.
> h1. 5. Impact
> - *Systematic, not isolated:* The formulas above show these issues are 
> determined by the divisor structure of N relative to *new*, *cur*, and *UB*. 
> Any partition count with sparse divisors triggers them which includes all 
> primes and most semi-primes, which are extremely common in production Kafka 
> deployments.
> - Combined with tight *VERTEX_MAX_PARALLELISM* settings, the alignment logic 
> can render the autoscaler completely unable to scale a vertex.
> - Direction inversion (Cases 2 and 6) can cause resource oscillation and 
> instability.
> - *MAXIMIZE_UTILISATION* mode fails to provide any benefit in the most 
> problematic scenarios (when *new > ⌊N/2⌋*), despite being specifically 
> designed for these use cases.
> - Spurious *ScalingLimited* events reduce signal-to-noise ratio in monitoring.
> h1. 6. Proposed Solution
> The new proposed solution covers:
> - *Direction safety by construction* - separate scale-up/scale-down paths 
> with structural guarantees.
> - *Single-responsibility modes* - the old combined EVENLY_SPREAD is 
> decomposed into distinct building blocks.
> - *Composable mode + fallback* - any mode paired with any fallback, no 
> combinatorial explosion.
> - *Open for extension* - new modes (e.g., *ADAPTIVE_DOWNWARD_SPREAD*, 
> NEAREST_SPREAD*) only need to implement the 
> *searchesWithinRange*/*allowsOutsideRange* predicates and an *isAligned* 
> branch; new fallbacks only need a *toMode()* mapping. The orchestration logic 
> in *scaleUp()*/*scaleDown()* and the fallback framework remain untouched.
> h2. 6.1. Direction-aware *scaleUp()* / *scaleDown()* entry points
> Split the single alignment code path into two direction-aware methods:
> - *scaleUp()* → guarantees the result is strictly *above* 
> *currentParallelism*, or returns *currentParallelism* (sentinel = "no aligned 
> value found").
> - *scaleDown()* → guarantees the result is strictly *below* 
> *currentParallelism*, or returns *currentParallelism*.
> The *scale()* method dispatches to the appropriate one based on 
> *newParallelism > currentParallelism* vs *newParallelism < 
> currentParallelism*.
> h2. 6.2. Three-phase alignment algorithm per mode
> Each mode's algorithm is structured into three phases (first match wins):
> | Phase | Description | Active for |
> | *Phase 1* (within range) | Divisor search between *currentParallelism* and 
> *newParallelism*. Scale-up searches downward from target; scale-down searches 
> upward. | *EVENLY_SPREAD*, *OPTIMIZE_RESOURCE_UTILIZATION* |
> | *Phase 2* (upward outside range) | Searches upward from *newParallelism* to 
> *upper align limit*. Accepts exact divisors; *MAXIMIZE_UTILISATION* 
> additionally accepts values where per-subtask load decreases. In scale-down, 
> capped at *min(currentParallelism, upper align limit)* to prevent crossing. | 
> *MAXIMIZE_UTILISATION*, *ADAPTIVE_UPWARD_SPREAD* |
> | *Phase 3* (relaxed downward fallback) | Searches downward from 
> *newParallelism* for the boundary where per-subtask load increases, then 
> snaps up to the nearest divisor-aligned value. In scale-up, a *direction 
> guard* rejects results ≤ *currentParallelism*. | *MAXIMIZE_UTILISATION*, 
> *ADAPTIVE_UPWARD_SPREAD* |
> Phase 2 and Phase 3 share the same `allowsOutsideRange(mode)` guard and 
> identical logic between scale-up and scale-down. They are extracted into a 
> single shared method *applyOutsideRangeSearch()*, parameterized by:
> - `phase2UpperBound` → `upper align limit` for scale-up; 
> `min(currentParallelism, upper align limit)` for scale-down.
> - `isScaleUp` → enables the direction guard in Phase 3.
> h2. 6.3. Search ranges diagram
> {code:none}
>                    PARALLELISM ALIGNMENT — SEARCH RANGES PER MODE
> ══════════════════════════════════════════════════════════════════════════════════════
>   Complete number line with all key barriers:
>   lower align      new ↓          current          new ↑         upper align
>     limit¹         target²       parallelism       target³         limit⁴
>       │               │               │               │               │
>   ────┴───────────────┴───────────────┴───────────────┴───────────────┴──►
>       │               │               │               │               │
>       │  ◄──── scale-down zone ─────► │ ◄────── scale-up zone ─────►  │
>   ¹ parallelismLowerLimit — floor for alignment results
>   ² newParallelism when scaling down
>   ³ newParallelism when scaling up
>   ⁴ upperBoundForAlignment = min(N, min(maxParallelism, 
> parallelismUpperLimit))
>     N = numKeyGroupsOrPartitions (key groups or source partitions)
>   Legend:  ◄ = downward search direction    ► = upward search direction
>            ├──────┤ = search range          P1/P2/P3 = phase (first match 
> wins)
> ═══════════════════════════════════════════════════════════════════════════════════════════
>   SCALE-UP  (newParallelism > currentParallelism)
> ═══════════════════════════════════════════════════════════════════════════════════════════
>                                      currentP          new(↑)       upper 
> align
>                                                                        limit
>                                         │                 │              │
>                                    ─────┼─────────────────┼──────────────┼──►
>   EVENLY_SPREAD          P1:            ├────────◄────────┤                   
>  N % p == 0
>   OPTIMIZE_RESOURCE_     P1:            ├────────◄────────┤                   
>  p ≤ N
>     UTILIZATION
>   · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · 
> · · · · ·
>   MAXIMIZE_UTILISATION   P2:                              ├────────►─────┤    
>  N%p==0 ∨ N/p<N/new
>                          P3:            ├────────◄────────┤                   
>  snap to load boundary
>   · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · 
> · · · · ·
>   ADAPTIVE_UPWARD_       P2:                              ├────────►─────┤    
>  N % p == 0
>     SPREAD               P3:            ├────────◄────────┤                   
>  snap to load boundary
>                          P3 direction guard: result must be > currentP, 
> otherwise discarded
> ═══════════════════════════════════════════════════════════════════════════════════════════
>   SCALE-DOWN  (newParallelism < currentParallelism)
> ═══════════════════════════════════════════════════════════════════════════════════════════
>                                   lower align        new(↓)              
> currentP
>                                      limit
>                                        │                │                    │
>                                   
> ─────┼────────────────┼────────────────────┼──►
>   EVENLY_SPREAD          P1:                            
> ├────────►───────────┤    N % p == 0
>   OPTIMIZE_RESOURCE_     P1:                            
> ├────────►───────────┤    p ≤ N
>     UTILIZATION
>   · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · 
> · · · · ·
>   MAXIMIZE_UTILISATION   P2:                            
> ├────────►───────────┤⁵   N%p==0 ∨ N/p<N/new
>                          P3:           ├───────◄────────┤                     
>     snap to load boundary
>   · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · 
> · · · · ·
>   ADAPTIVE_UPWARD_       P2:                            
> ├────────►───────────┤⁵   N % p == 0
>     SPREAD               P3:           ├───────◄────────┤                     
>     snap to load boundary
>   ⁵ Phase 2 upper bound capped at min(currentP, upper align limit) to prevent 
> direction inversion
>     Phase 3 results are inherently ≤ new(↓) < currentP → no direction guard 
> needed
> ═══════════════════════════════════════════════════════════════════════════════════════════
>   MODE-THEN-FALLBACK FLOW  (applies to both scale-up and scale-down)
> ═══════════════════════════════════════════════════════════════════════════════════════════
>   ┌─────────────────────┐     found?     ┌──────────┐
>   │ Run primary mode's  ├──── YES ──────►│  Return  │
>   │ full P1 → P2 → P3   │                │  result  │
>   └─────────┬───────────┘                └──────────┘
>             │ NO (sentinel)
>             ▼
>   ┌─────────────────────┐     found?     ┌──────────┐
>   │ Run fallback mode's ├──── YES ──────►│  Return  │
>   │ full P1 → P2 → P3   │                │  result  │
>   └─────────┬───────────┘                └──────────┘
>             │ NO (sentinel)
>             ▼
>   ┌─────────────────────────────────────────────────┐
>   │ WARN log (compliant vs last-resort check)       │
>   │ Emit SCALING_LIMITED event                      │
>   │ Return currentParallelism (scaling blocked)     │
>   └─────────────────────────────────────────────────┘
> {code}
> h2. 6.4. New modes: *EVENLY_SPREAD* (redefined) and 
> *OPTIMIZE_RESOURCE_UTILIZATION*
> The original implementation offered only two modes (*EVENLY_SPREAD* and 
> *MAXIMIZE_UTILISATION*), but *EVENLY_SPREAD* was misleadingly named, it 
> actually ran both an upward outside-range search *and* a downward relaxed 
> fallback (the same three-phase algorithm now explicitly named Phase 1+2+3), 
> making it behave like a combined strategy rather than a strict divisor-only 
> mode. This made it impossible for users to select a truly conservative 
> alignment that blocks scaling when no divisor exists in the safe range.
> The new design introduces two additional modes to cover the full spectrum of 
> alignment strictness:
> | Mode | Strictness | Search behavior | When to use |
> | *EVENLY_SPREAD* (redefined) | Strict | Within-range only (Phase 1). Accepts 
> only exact divisors of N. Blocks scaling if none found. | Data-sensitive 
> workloads where uneven distribution is unacceptable (e.g., windowed 
> aggregations, partitioned state). |
> | *OPTIMIZE_RESOURCE_UTILIZATION* (new) | Minimal | Within-range only (Phase 
> 1). Accepts any *p ≤ N* (every subtask has at least one item). | 
> Cost-sensitive workloads where resource efficiency matters more than even 
> distribution. The autoscaler's computed target is used almost as-is. |
> Together with the existing modes, the full design space is:
> | Mode | Alignment strictness | Phases used | Search direction |
> | *EVENLY_SPREAD* | Strict (divisor only) | P1 | Within range only |
> | *OPTIMIZE_RESOURCE_UTILIZATION* | Minimal (*p ≤ N*) | P1 | Within range 
> only |
> | *MAXIMIZE_UTILISATION* | Relaxed (*N/p < N/new*) | P2 + P3 | Upward outside 
> range + downward fallback |
> | *ADAPTIVE_UPWARD_SPREAD* | Combined (divisor → relaxed fallback) | P2 + P3 
> | Upward outside range + downward fallback |
> h2. 6.5. *ADAPTIVE_UPWARD_SPREAD* mode (rename + new default)
> The old *EVENLY_SPREAD* behavior, which combined a divisor search with an 
> implicit *MAXIMIZE_UTILISATION*-style fallback and a downward relaxed search, 
> is captured by the new *ADAPTIVE_UPWARD_SPREAD* mode. This name explicitly 
> conveys that it searches *above* the target and adapts between strict and 
> relaxed alignment. It is now the *default mode*, preserving backward 
> compatibility with the old *EVENLY_SPREAD* behavior while giving it a more 
> accurate name.
> h2. 6.6. Composable fallback configuration
> With properly separated modes, strict modes like *EVENLY_SPREAD* will block 
> scaling when no divisor exists in the safe range. Previously, the relaxed 
> fallback was implicitly baked into *EVENLY_SPREAD* with no way to control it. 
> The new design introduces a separate fallback configuration:
> {code:java}
> job.autoscaler.scaling.key-group.partitions.adjust.mode.fallback
> {code}
> *Why this is needed:*
> - *Separation of concerns:* The primary mode defines the *preferred* 
> alignment strategy; the fallback defines what to do when it fails. Mixing 
> both into a single enum creates combinatorial explosion and unclear semantics.
> - *User control:* Different workloads need different trade-offs. A user might 
> want strict *EVENLY_SPREAD* alignment as the primary strategy but accept 
> *OPTIMIZE_RESOURCE_UTILIZATION* as a fallback to avoid blocking scaling 
> entirely. Without a fallback config, the only options are "strict and block" 
> or "relaxed from the start."
> - *Extensibility:* New modes can be added without creating N × M combined 
> enum values. Any mode can be paired with any fallback.
> *Enum:* *KeyGroupOrPartitionsAdjustFallback* with values:
> | Value | Behavior |
> | *DEFAULT* | Uses the mode's built-in fallback. Only 
> *ADAPTIVE_UPWARD_SPREAD* has a built-in fallback (*MAXIMIZE_UTILISATION*). 
> All others resolve to *NONE*. |
> | *NONE* | Blocks scaling when no aligned value is found. |
> | *ADAPTIVE_UPWARD_SPREAD* | Delegates to the full *ADAPTIVE_UPWARD_SPREAD* 
> algorithm as a second pass. |
> | *MAXIMIZE_UTILISATION* | Falls back to the relaxed *MAXIMIZE_UTILISATION* 
> search. |
> | *OPTIMIZE_RESOURCE_UTILIZATION* | Falls back to accepting any value where 
> *p ≤ N*. |
> The *toMode()* method on the fallback enum maps each value to the 
> corresponding *KeyGroupOrPartitionsAdjustMode*, enabling the same 
> *applyScaleUpMode* / *applyScaleDownMode* methods to be reused for both 
> primary and fallback passes → no code duplication.
> *Example configurations:*
> | Use case | Mode | Fallback | Behavior |
> | Backward-compatible default | *ADAPTIVE_UPWARD_SPREAD* | *DEFAULT* → 
> *MAXIMIZE_UTILISATION* | Tries divisor above target, falls back to relaxed 
> load-decrease search |
> | Strict with graceful degradation | *EVENLY_SPREAD* | 
> *OPTIMIZE_RESOURCE_UTILIZATION* | Tries exact divisor in range; if none, 
> accepts any *p ≤ N* |
> | Strict, block on failure | *EVENLY_SPREAD* | *NONE* | Exact divisor or 
> nothing — scaling blocked if no divisor in range |
> | Maximum resource efficiency | *OPTIMIZE_RESOURCE_UTILIZATION* | *NONE* | 
> Always accepts the autoscaler's target (as long as *p ≤ N*) |
> | Strict with upward fallback | *EVENLY_SPREAD* | *ADAPTIVE_UPWARD_SPREAD* | 
> Tries divisor in range; if none, searches above target with full adaptive 
> algorithm |
> h2. 6.7. Proposed Scaling Flow
> {code:none}
> ┌──────────────────────────────────┐
> │  applyScaleUpMode(primaryMode)   │  ← self-contained: Phase 1 + 2 + 3
> │  returns result or curP          │
> └──────────┬───────────────────────┘
>            │ result == curP?
>            ▼ yes
> ┌──────────────────────────────────┐
> │  applyScaleUpMode(fallbackMode)  │  ← same method, different mode
> │  returns result or curP          │
> └──────────┬───────────────────────┘
>            │ result == curP?
>            ▼ yes
>    emit SCALING_LIMITED event
>    return curP
> {code}
> h2. 6.8. Compliance-aware warning logging
> When both the primary mode and fallback fail to find an aligned value, a 
> *WARN* log distinguishes between two cases:
> - *Compliant:* *currentParallelism* is itself aligned with the mode/fallback 
> - there's simply no *better* aligned value in the scaling direction.
> - *Last resort:* *currentParallelism* does *not* comply with the mode or 
> fallback - it is returned purely to preserve the scaling direction.
> This helps operators diagnose whether the autoscaler is stuck at a good value 
> (just no room to scale) vs stuck at a bad value (alignment constraint is too 
> restrictive for the current state).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to