[ 
https://issues.apache.org/jira/browse/SPARK-56627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kent Yao reassigned SPARK-56627:
--------------------------------

    Assignee: Kent Yao

> DecimalAggregates: peel scale-preserving widening Cast for Sum and Average
> --------------------------------------------------------------------------
>
>                 Key: SPARK-56627
>                 URL: https://issues.apache.org/jira/browse/SPARK-56627
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 4.2.0
>            Reporter: Kent Yao
>            Assignee: Kent Yao
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Background
> The Catalyst optimizer rule {{DecimalAggregates}} (in 
> {{org.apache.spark.sql.catalyst.optimizer}}) rewrites {{Sum(d : DECIMAL(p, 
> s))}} with {{p + 10 <= MAX_LONG_DIGITS (=18)}} into 
> {{MakeDecimal(Sum(UnscaledValue(d)), p + 10, s)}}, replacing the generic 
> {{CheckOverflow}} decimal path with a Long-arithmetic fast path that 
> materially reduces per-row cost on aggregate and window paths. A symmetric 
> Long-then-Double fast path exists for {{Avg(d : DECIMAL(p, s))}} when {{p + 4 
> <= MAX_DOUBLE_DIGITS (=15)}}.
> The fast paths are bypassed today whenever the input is wrapped in a 
> *scale-preserving widening* {{Cast}}, e.g. {{Sum(Cast(d AS DECIMAL(p', s)))}} 
> with {{p' > p}}, {{s' == s}}, even though the inner expression's effective 
> precision {{p}} would satisfy the precondition. This is a common shape in 
> BI-tool-generated SQL where the inner column is normalised to a wider 
> declared decimal before aggregation.
> h2. Proposal
> Extend {{DecimalAggregates}} with two new optimizer arms that peel a 
> scale-preserving widening decimal {{Cast}} around the {{Sum}} / {{Average}} 
> child, so the existing Long-backed fast path fires on the original narrow 
> input:
> * *SUM arm* -- rewrite {{Sum(Cast(inner : dec(p, s), dec(p', s)))}} with {{p' 
> >= p}} and {{p + 10 <= 18}} into 
> {{Cast(MakeDecimal(Sum(UnscaledValue(inner)), p + 10, s), 
> DecimalType.bounded(p' + 10, s))}}. The outer compensating Cast preserves the 
> user-visible output type exactly.
> * *AVG arm* -- same shape on {{Average}}, with a *strict-subset guard* {{p <= 
> AVG_PEEL_MAX_INNER_PRECISION (= 7)}}. This conservative narrowing keeps the 
> new arm's trigger surface inside the existing {{Average(decimal(p<=7, s))}} 
> fast path's surface, so SPARK-37024 (Double-regime silent precision loss) is 
> *not* amplified by this change. The mathematical divide-guard limit is wider 
> ({{p <= 11}}); the {{<= 7}} bound is documented in code and locked by an 
> invariant guard test.
> Both arms share a single private {{WidenedDecimalChild}} extractor that 
> explicitly refuses to unwrap {{CheckOverflow}} (preserves row-level overflow 
> semantics) and {{PromotePrecision}}.
> h2. Guard asymmetry
> ||arm||trigger||guard||rationale||
> |SUM widened-Cast|{{p + 10 <= MAX_LONG_DIGITS (18)}}|identical to existing 
> SUM fast arm|Long accumulator never wraps in peel domain; existing SUM safety 
> bound applies unchanged|
> |AVG widened-Cast|{{p <= AVG_PEEL_MAX_INNER_PRECISION (7)}}|tighter than 
> existing AVG arm's mathematical limit ({{p + 4 <= 15}}, i.e. {{p <= 
> 11}})|conservative narrowing: stays strictly inside existing AVG arm's actual 
> surface so SPARK-37024 Double-regime symptoms cannot be amplified by this 
> change. Relaxing past 7 requires re-evaluating SPARK-37024 first.|
> Two invariant guard tests in {{DecimalAggregatesSuite}} pin the boundary: 
> {{SUM(CAST(dec(9,2) AS dec(19,2)))}} and {{AVG(CAST(dec(8,2) AS dec(20,2)))}} 
> both must not peel today; future code that relaxes either guard without 
> re-deriving the safety bound will fail these tests in CI.
> h2. Window arm
> Both arms extend symmetrically to {{WindowExpression(Sum/Average(...))}} 
> (mirroring SPARK-14664's original Window extension of {{DecimalAggregates}}). 
> However, no production-benefit claim is made for the Window arm: the peel 
> rule is structurally identical and trivially safe under the same guards, but 
> no TPC-DS / TPC-H query exercising {{SUM(CAST decimal) OVER win}} is known. 
> The included benchmark scaffold does not include a Window microbenchmark; 
> Window cases stay as a no-op pair confirming the on/off symmetry today. 
> Symmetric extension is preferred over {{Aggregate}}-only on engineering 
> grounds: avoids introducing rule-arm asymmetry that future reviewers would 
> need to special-case against SPARK-3933 / SPARK-14664's existing dual-arm 
> shape.
> h2. SQLConf gate
> *None.* Precedent: SPARK-3933 (the original SUM Long fast-path optimisation) 
> and SPARK-14664 (the Window extension) both landed without a SQLConf gate. 
> The peel arms here are strictly more conservative than those existing arms 
> (narrower trigger surface for AVG; identical guard for SUM); SPARK-24802 
> ({{spark.sql.optimizer.excludedRules}}) provides the standard escape hatch if 
> anyone needs to disable this specific arm at runtime.
> h2. Verification
> * {{DecimalAggregatesSuite}} (catalyst, plan-level): 30 tests covering 
> fast-path SUM/AVG no-Cast regression, widened-Cast SUM/AVG peel positives, 
> narrowing/scale-change/{{CheckOverflow(Cast)}} negatives, {{Literal(null)}} 
> and empty-{{LocalRelation}} plan-shape witnesses, idempotence + RuleExecutor 
> convergence, AVG band {{[8, 11]}} boundary, and the Long-accumulator / 
> Double-regime safety boundary invariant guards.
> * {{ScalaCheck}} plan-shape PBT (catalyst): structural invariants over the 
> {{(p, p', s)}} peel-eligible lattice.
> * {{ScalaCheck}} numerical-equivalence PBT (sql/core): SUM PBT vs 
> {{java.math.BigDecimal}} oracle over {{(p in [1, 8], p' in [max(p+1, 9), 28], 
> s in [0, p], n in [1, 1000])}}; AVG PBT vs unpeeled {{AVG(x)}} (both arms 
> share the same Double-divide path, only outer compensating Cast precision 
> differs). The {{p'}} wider-than-{{11}} regime is exercised by a separate 
> shape witness asserting the expected {{DecimalType.bounded(p' + 4, s + 4)}} 
> output schema.
> * Plan stability: {{TPCDSV1_4PlanStabilitySuite}} and 
> {{TPCDSV1_4PlanStabilityWithStatsSuite}} regenerated for q18 (the canonical 
> {{SUM(CAST(dec(7, 2) AS dec(17, 2)))}} workload). {{TPCDSV2_7_*}} suites 
> unchanged (v2.7 q18a is a different shape and has no decimal widened-Cast 
> site).
> h2. Bench
> {{DecimalAggregatesBenchmark}} added in this change, single file covering SUM 
> Aggregate (Section A, four cases A1-A4), SUM Window (Section B, two cases 
> B1-B2), and AVG Aggregate (Section C, four cases C1-C4). 3-JDK GHA results 
> (JDK 17 / 21 / 25 on AMD EPYC 9V74) show:
> * SUM A1 / A3 / A4 (peel-eligible Aggregate cases): *~1.1x* speedup.
> * SUM A2 ({{p + 10 > 18}}, peel rejected by guard): 1.0x neutral.
> * SUM B1 / B2 (Window): 1.0x neutral (declared above).
> * AVG C1 / C2 / C3 / C4 (all peel-eligible under the {{p <= 7}} guard): 
> *~1.1x* speedup uniformly.
> h2. Out of scope
> * Scale-changing Cast (Phase 2+ requires range-preserving reasoning).
> * Narrowing Cast (correctness not preserved).
> * Cast from non-decimal source (separate mechanism).
> * {{Sum(a + Cast(b, ...))}} -- arithmetic under Cast.
> * {{MIN}} / {{MAX}} (covered by a different code path).
> * Unwrapping {{CheckOverflow(Cast(...))}} (preserved as conservative).
> * Relaxing AVG guard past {{p <= 7}} (deferred until SPARK-37024 root fix).
> h2. References
> * SPARK-3933 -- original {{DecimalAggregates}} SUM fast-path.
> * SPARK-14664 -- Window extension of the SUM fast-path.
> * SPARK-37024 -- AVG Double-regime silent precision loss (root issue keeping 
> the AVG guard conservative).
> * SPARK-24802 -- {{spark.sql.optimizer.excludedRules}} as the runtime escape 
> hatch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to