[
https://issues.apache.org/jira/browse/SPARK-56627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kent Yao updated SPARK-56627:
-----------------------------
Description:
h2. Background
The Catalyst optimizer rule {{DecimalAggregates}} (in
{{org.apache.spark.sql.catalyst.optimizer}}) rewrites {{Sum(d : DECIMAL(p,
s))}} with {{p + 10 <= MAX_LONG_DIGITS (=18)}} into
{{MakeDecimal(Sum(UnscaledValue(d)), p + 10, s)}}, replacing the generic
{{CheckOverflow}} decimal path with a Long-arithmetic fast path that materially
reduces per-row cost on aggregate and window paths. A symmetric
Long-then-Double fast path exists for {{Avg(d : DECIMAL(p, s))}} when {{p + 4
<= MAX_DOUBLE_DIGITS (=15)}}.
The fast paths are bypassed today whenever the input is wrapped in a
*scale-preserving widening* {{Cast}}, e.g. {{Sum(Cast(d AS DECIMAL(p', s)))}}
with {{p' > p}}, {{s' == s}}, even though the inner expression's effective
precision {{p}} would satisfy the precondition. This is a common shape in
BI-tool-generated SQL where the inner column is normalised to a wider declared
decimal before aggregation.
h2. Proposal
Extend {{DecimalAggregates}} with two new optimizer arms that peel a
scale-preserving widening decimal {{Cast}} around the {{Sum}} / {{Average}}
child, so the existing Long-backed fast path fires on the original narrow input:
* *SUM arm* -- rewrite {{Sum(Cast(inner : dec(p, s), dec(p', s)))}} with {{p'
>= p}} and {{p + 10 <= 18}} into {{Cast(MakeDecimal(Sum(UnscaledValue(inner)),
p + 10, s), DecimalType.bounded(p' + 10, s))}}. The outer compensating Cast
preserves the user-visible output type exactly.
* *AVG arm* -- same shape on {{Average}}, with a *strict-subset guard* {{p <=
AVG_PEEL_MAX_INNER_PRECISION (= 7)}}. This conservative narrowing keeps the new
arm's trigger surface inside the existing {{Average(decimal(p<=7, s))}} fast
path's surface, so SPARK-37024 (Double-regime silent precision loss) is *not*
amplified by this change. The mathematical divide-guard limit is wider ({{p <=
11}}); the {{<= 7}} bound is documented in code and locked by an invariant
guard test.
Both arms share a single private {{WidenedDecimalChild}} extractor that
explicitly refuses to unwrap {{CheckOverflow}} (preserves row-level overflow
semantics) and {{PromotePrecision}}.
h2. Guard asymmetry
||arm||trigger||guard||rationale||
|SUM widened-Cast|{{p + 10 <= MAX_LONG_DIGITS (18)}}|identical to existing SUM
fast arm|Long accumulator never wraps in peel domain; existing SUM safety bound
applies unchanged|
|AVG widened-Cast|{{p <= AVG_PEEL_MAX_INNER_PRECISION (7)}}|tighter than
existing AVG arm's mathematical limit ({{p + 4 <= 15}}, i.e. {{p <=
11}})|conservative narrowing: stays strictly inside existing AVG arm's actual
surface so SPARK-37024 Double-regime symptoms cannot be amplified by this
change. Relaxing past 7 requires re-evaluating SPARK-37024 first.|
Two invariant guard tests in {{DecimalAggregatesSuite}} pin the boundary:
{{SUM(CAST(dec(9,2) AS dec(19,2)))}} and {{AVG(CAST(dec(8,2) AS dec(20,2)))}}
both must not peel today; future code that relaxes either guard without
re-deriving the safety bound will fail these tests in CI.
h2. Window arm
Both arms extend symmetrically to {{WindowExpression(Sum/Average(...))}}
(mirroring SPARK-14664's original Window extension of {{DecimalAggregates}}).
However, no production-benefit claim is made for the Window arm: the peel rule
is structurally identical and trivially safe under the same guards, but no
TPC-DS / TPC-H query exercising {{SUM(CAST decimal) OVER win}} is known. The
included benchmark scaffold does not include a Window microbenchmark; Window
cases stay as a no-op pair confirming the on/off symmetry today. Symmetric
extension is preferred over {{Aggregate}}-only on engineering grounds: avoids
introducing rule-arm asymmetry that future reviewers would need to special-case
against SPARK-3933 / SPARK-14664's existing dual-arm shape.
h2. SQLConf gate
*None.* Precedent: SPARK-3933 (the original SUM Long fast-path optimisation)
and SPARK-14664 (the Window extension) both landed without a SQLConf gate. The
peel arms here are strictly more conservative than those existing arms
(narrower trigger surface for AVG; identical guard for SUM); SPARK-24802
({{spark.sql.optimizer.excludedRules}}) provides the standard escape hatch if
anyone needs to disable this specific arm at runtime.
h2. Verification
* {{DecimalAggregatesSuite}} (catalyst, plan-level): 30 tests covering
fast-path SUM/AVG no-Cast regression, widened-Cast SUM/AVG peel positives,
narrowing/scale-change/{{CheckOverflow(Cast)}} negatives, {{Literal(null)}} and
empty-{{LocalRelation}} plan-shape witnesses, idempotence + RuleExecutor
convergence, AVG band {{[8, 11]}} boundary, and the Long-accumulator /
Double-regime safety boundary invariant guards.
* {{ScalaCheck}} plan-shape PBT (catalyst): structural invariants over the
{{(p, p', s)}} peel-eligible lattice.
* {{ScalaCheck}} numerical-equivalence PBT (sql/core): SUM PBT vs
{{java.math.BigDecimal}} oracle over {{(p in [1, 8], p' in [max(p+1, 9), 28], s
in [0, p], n in [1, 1000])}}; AVG PBT vs unpeeled {{AVG(x)}} (both arms share
the same Double-divide path, only outer compensating Cast precision differs).
The {{p'}} wider-than-{{11}} regime is exercised by a separate shape witness
asserting the expected {{DecimalType.bounded(p' + 4, s + 4)}} output schema.
* Plan stability: {{TPCDSV1_4PlanStabilitySuite}} and
{{TPCDSV1_4PlanStabilityWithStatsSuite}} regenerated for q18 (the canonical
{{SUM(CAST(dec(7, 2) AS dec(17, 2)))}} workload). {{TPCDSV2_7_*}} suites
unchanged (v2.7 q18a is a different shape and has no decimal widened-Cast site).
h2. Bench
{{DecimalAggregatesBenchmark}} added in this change, single file covering SUM
Aggregate (Section A, four cases A1-A4), SUM Window (Section B, two cases
B1-B2), and AVG Aggregate (Section C, four cases C1-C4). 3-JDK GHA results (JDK
17 / 21 / 25 on AMD EPYC 9V74) show:
* SUM A1 / A3 / A4 (peel-eligible Aggregate cases): *~1.1x* speedup.
* SUM A2 ({{p + 10 > 18}}, peel rejected by guard): 1.0x neutral.
* SUM B1 / B2 (Window): 1.0x neutral (declared above).
* AVG C1 / C2 / C3 / C4 (all peel-eligible under the {{p <= 7}} guard): *~1.1x*
speedup uniformly.
h2. Out of scope
* Scale-changing Cast (Phase 2+ requires range-preserving reasoning).
* Narrowing Cast (correctness not preserved).
* Cast from non-decimal source (separate mechanism).
* {{Sum(a + Cast(b, ...))}} -- arithmetic under Cast.
* {{MIN}} / {{MAX}} (covered by a different code path).
* Unwrapping {{CheckOverflow(Cast(...))}} (preserved as conservative).
* Relaxing AVG guard past {{p <= 7}} (deferred until SPARK-37024 root fix).
h2. References
* SPARK-3933 -- original {{DecimalAggregates}} SUM fast-path.
* SPARK-14664 -- Window extension of the SUM fast-path.
* SPARK-37024 -- AVG Double-regime silent precision loss (root issue keeping
the AVG guard conservative).
* SPARK-24802 -- {{spark.sql.optimizer.excludedRules}} as the runtime escape
hatch.
was:
## Background
The Catalyst optimizer rule `DecimalAggregates` (in
`org.apache.spark.sql.catalyst.optimizer`) rewrites `Sum(d : DECIMAL(p, s))`
with `p + 10 <= MAX_LONG_DIGITS (=18)` into `MakeDecimal(Sum(UnscaledValue(d)),
p + 10, s)`, replacing the generic `CheckOverflow` decimal path with a
Long-arithmetic fast-path that materially reduces per-row cost on aggregate and
window paths.
This rewrite is bypassed today when the input is wrapped in a *scale-preserving
widening* Cast, e.g. `Sum(Cast(d : DECIMAL(p, s) AS DECIMAL(p* > p`, `s* == s`
(scale preserved), and the inner `Sum(child)` would satisfy the existing `p +
10 <= MAX_LONG_DIGITS` precondition.
2. Peels the Cast and lets the existing fast-path rewrite fire on the inner
`Sum(child)`.
3. Is gated by a new internal SQLConf, default `false`, so existing plans are
byte-identical with the conf off.
Scope:
* Phase 1: `Sum` only.
* Aggregate arm and Window arm covered by a single peel point.
* Rule plumbing only — no public API change, no SQL grammar change, no metrics
change.
## SQLConf
* Key: `spark.sql.optimizer.decimalAggregates.peelWidenedCastForSum.enabled`
* Type: `boolean`
* Default: `false`
* Visibility: `.internal()`
## Behaviour change
* SQLConf default (`false`): byte-identical to today.
* SQLConf `true`: only the qualifying `Sum(Cast(...))` shape changes.
## Verification
* Plan-shape PBT in `DecimalAggregatesSuite` (oracle-1/oracle-2/idempotence).
* Numerical-equivalence PBT in `DataFrameAggregateSuite`.
* `DecimalAggregatesBenchmark` under `sql/core`. Local JDK 17 smoke (N=1M,
iters=2): peel-on per-row ns < peel-off on all 6 cases at both
`decimalOperations.allowPrecisionLoss=true/false`. A1 ~1.2-1.3x.
## Public API change
None.
## SPIP
Not required.
## Affects Version
4.2.0 (master).
Summary: DecimalAggregates: peel scale-preserving widening Cast for Sum
and Average (was: DecimalAggregates: peel scale-preserving widening Cast for
Sum to enable fast-path rewrite)
> DecimalAggregates: peel scale-preserving widening Cast for Sum and Average
> --------------------------------------------------------------------------
>
> Key: SPARK-56627
> URL: https://issues.apache.org/jira/browse/SPARK-56627
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 4.2.0
> Reporter: Kent Yao
> Priority: Major
>
> h2. Background
> The Catalyst optimizer rule {{DecimalAggregates}} (in
> {{org.apache.spark.sql.catalyst.optimizer}}) rewrites {{Sum(d : DECIMAL(p,
> s))}} with {{p + 10 <= MAX_LONG_DIGITS (=18)}} into
> {{MakeDecimal(Sum(UnscaledValue(d)), p + 10, s)}}, replacing the generic
> {{CheckOverflow}} decimal path with a Long-arithmetic fast path that
> materially reduces per-row cost on aggregate and window paths. A symmetric
> Long-then-Double fast path exists for {{Avg(d : DECIMAL(p, s))}} when {{p + 4
> <= MAX_DOUBLE_DIGITS (=15)}}.
> The fast paths are bypassed today whenever the input is wrapped in a
> *scale-preserving widening* {{Cast}}, e.g. {{Sum(Cast(d AS DECIMAL(p', s)))}}
> with {{p' > p}}, {{s' == s}}, even though the inner expression's effective
> precision {{p}} would satisfy the precondition. This is a common shape in
> BI-tool-generated SQL where the inner column is normalised to a wider
> declared decimal before aggregation.
> h2. Proposal
> Extend {{DecimalAggregates}} with two new optimizer arms that peel a
> scale-preserving widening decimal {{Cast}} around the {{Sum}} / {{Average}}
> child, so the existing Long-backed fast path fires on the original narrow
> input:
> * *SUM arm* -- rewrite {{Sum(Cast(inner : dec(p, s), dec(p', s)))}} with {{p'
> >= p}} and {{p + 10 <= 18}} into
> {{Cast(MakeDecimal(Sum(UnscaledValue(inner)), p + 10, s),
> DecimalType.bounded(p' + 10, s))}}. The outer compensating Cast preserves the
> user-visible output type exactly.
> * *AVG arm* -- same shape on {{Average}}, with a *strict-subset guard* {{p <=
> AVG_PEEL_MAX_INNER_PRECISION (= 7)}}. This conservative narrowing keeps the
> new arm's trigger surface inside the existing {{Average(decimal(p<=7, s))}}
> fast path's surface, so SPARK-37024 (Double-regime silent precision loss) is
> *not* amplified by this change. The mathematical divide-guard limit is wider
> ({{p <= 11}}); the {{<= 7}} bound is documented in code and locked by an
> invariant guard test.
> Both arms share a single private {{WidenedDecimalChild}} extractor that
> explicitly refuses to unwrap {{CheckOverflow}} (preserves row-level overflow
> semantics) and {{PromotePrecision}}.
> h2. Guard asymmetry
> ||arm||trigger||guard||rationale||
> |SUM widened-Cast|{{p + 10 <= MAX_LONG_DIGITS (18)}}|identical to existing
> SUM fast arm|Long accumulator never wraps in peel domain; existing SUM safety
> bound applies unchanged|
> |AVG widened-Cast|{{p <= AVG_PEEL_MAX_INNER_PRECISION (7)}}|tighter than
> existing AVG arm's mathematical limit ({{p + 4 <= 15}}, i.e. {{p <=
> 11}})|conservative narrowing: stays strictly inside existing AVG arm's actual
> surface so SPARK-37024 Double-regime symptoms cannot be amplified by this
> change. Relaxing past 7 requires re-evaluating SPARK-37024 first.|
> Two invariant guard tests in {{DecimalAggregatesSuite}} pin the boundary:
> {{SUM(CAST(dec(9,2) AS dec(19,2)))}} and {{AVG(CAST(dec(8,2) AS dec(20,2)))}}
> both must not peel today; future code that relaxes either guard without
> re-deriving the safety bound will fail these tests in CI.
> h2. Window arm
> Both arms extend symmetrically to {{WindowExpression(Sum/Average(...))}}
> (mirroring SPARK-14664's original Window extension of {{DecimalAggregates}}).
> However, no production-benefit claim is made for the Window arm: the peel
> rule is structurally identical and trivially safe under the same guards, but
> no TPC-DS / TPC-H query exercising {{SUM(CAST decimal) OVER win}} is known.
> The included benchmark scaffold does not include a Window microbenchmark;
> Window cases stay as a no-op pair confirming the on/off symmetry today.
> Symmetric extension is preferred over {{Aggregate}}-only on engineering
> grounds: avoids introducing rule-arm asymmetry that future reviewers would
> need to special-case against SPARK-3933 / SPARK-14664's existing dual-arm
> shape.
> h2. SQLConf gate
> *None.* Precedent: SPARK-3933 (the original SUM Long fast-path optimisation)
> and SPARK-14664 (the Window extension) both landed without a SQLConf gate.
> The peel arms here are strictly more conservative than those existing arms
> (narrower trigger surface for AVG; identical guard for SUM); SPARK-24802
> ({{spark.sql.optimizer.excludedRules}}) provides the standard escape hatch if
> anyone needs to disable this specific arm at runtime.
> h2. Verification
> * {{DecimalAggregatesSuite}} (catalyst, plan-level): 30 tests covering
> fast-path SUM/AVG no-Cast regression, widened-Cast SUM/AVG peel positives,
> narrowing/scale-change/{{CheckOverflow(Cast)}} negatives, {{Literal(null)}}
> and empty-{{LocalRelation}} plan-shape witnesses, idempotence + RuleExecutor
> convergence, AVG band {{[8, 11]}} boundary, and the Long-accumulator /
> Double-regime safety boundary invariant guards.
> * {{ScalaCheck}} plan-shape PBT (catalyst): structural invariants over the
> {{(p, p', s)}} peel-eligible lattice.
> * {{ScalaCheck}} numerical-equivalence PBT (sql/core): SUM PBT vs
> {{java.math.BigDecimal}} oracle over {{(p in [1, 8], p' in [max(p+1, 9), 28],
> s in [0, p], n in [1, 1000])}}; AVG PBT vs unpeeled {{AVG(x)}} (both arms
> share the same Double-divide path, only outer compensating Cast precision
> differs). The {{p'}} wider-than-{{11}} regime is exercised by a separate
> shape witness asserting the expected {{DecimalType.bounded(p' + 4, s + 4)}}
> output schema.
> * Plan stability: {{TPCDSV1_4PlanStabilitySuite}} and
> {{TPCDSV1_4PlanStabilityWithStatsSuite}} regenerated for q18 (the canonical
> {{SUM(CAST(dec(7, 2) AS dec(17, 2)))}} workload). {{TPCDSV2_7_*}} suites
> unchanged (v2.7 q18a is a different shape and has no decimal widened-Cast
> site).
> h2. Bench
> {{DecimalAggregatesBenchmark}} added in this change, single file covering SUM
> Aggregate (Section A, four cases A1-A4), SUM Window (Section B, two cases
> B1-B2), and AVG Aggregate (Section C, four cases C1-C4). 3-JDK GHA results
> (JDK 17 / 21 / 25 on AMD EPYC 9V74) show:
> * SUM A1 / A3 / A4 (peel-eligible Aggregate cases): *~1.1x* speedup.
> * SUM A2 ({{p + 10 > 18}}, peel rejected by guard): 1.0x neutral.
> * SUM B1 / B2 (Window): 1.0x neutral (declared above).
> * AVG C1 / C2 / C3 / C4 (all peel-eligible under the {{p <= 7}} guard):
> *~1.1x* speedup uniformly.
> h2. Out of scope
> * Scale-changing Cast (Phase 2+ requires range-preserving reasoning).
> * Narrowing Cast (correctness not preserved).
> * Cast from non-decimal source (separate mechanism).
> * {{Sum(a + Cast(b, ...))}} -- arithmetic under Cast.
> * {{MIN}} / {{MAX}} (covered by a different code path).
> * Unwrapping {{CheckOverflow(Cast(...))}} (preserved as conservative).
> * Relaxing AVG guard past {{p <= 7}} (deferred until SPARK-37024 root fix).
> h2. References
> * SPARK-3933 -- original {{DecimalAggregates}} SUM fast-path.
> * SPARK-14664 -- Window extension of the SUM fast-path.
> * SPARK-37024 -- AVG Double-regime silent precision loss (root issue keeping
> the AVG guard conservative).
> * SPARK-24802 -- {{spark.sql.optimizer.excludedRules}} as the runtime escape
> hatch.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]