[ 
https://issues.apache.org/jira/browse/SPARK-57424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18088696#comment-18088696
 ] 

Anupam Yadav commented on SPARK-57424:
--------------------------------------

I am working on this internally and will publish the PR once SPARK-57220 lands 
since its a follow-up of it.

> Add First/Last to segment-tree window aggregate allowlist
> ---------------------------------------------------------
>
>                 Key: SPARK-57424
>                 URL: https://issues.apache.org/jira/browse/SPARK-57424
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 4.3.0
>            Reporter: Anupam Yadav
>            Priority: Major
>
> SPARK-56546 introduced segment-tree-based window-frame aggregation for 
> sliding ROWS/RANGE frames, and SPARK-57220 extended it to shrinking frames 
> ({{... BETWEEN <lower> AND UNBOUNDED FOLLOWING}}). Both rely on 
> {{WindowSegmentTree.EligibleAggregates}} -- a static allowlist of 
> {{DeclarativeAggregate}} subclasses safe for segment-tree execution.
> {{First}} and {{Last}} were previously excluded from this allowlist, marked 
> "order-dependent", and continued to use the legacy 
> {{SlidingWindowFunctionFrame.write}} (O(N x W)) and 
> {{UnboundedFollowingWindowFunctionFrame.write}} (O(N^2)) paths.
> This was over-conservative. Order-dependence in row-traversal order is 
> exactly what {{WindowSegmentTree.query}} provides: the query walks 
> left-to-right (left partial -> full blocks ascending -> right partial; within 
> a block, {{queryDescend}} walks children in ascending index order). 
> {{First.mergeExpressions}} = {{if(valueSet.left, left, right)}} and 
> {{Last.mergeExpressions}} = {{if(valueSet.right, right, left)}} are correct 
> under that traversal -- they pick the row-order extreme across any contiguous 
> range.
> For IGNORE NULLS the same merge is mode-agnostic: per-row 
> {{updateExpressions}} only set {{valueSet=true}} on non-null values, so a 
> per-block partial of {{(null, false)}} for an all-NULL block is correctly 
> skipped when merged with a later non-null block.
> h2. Proposal
> Add {{classOf[First]}} and {{classOf[Last]}} to 
> {{WindowSegmentTree.EligibleAggregates}}. No new frame class, no new SQLConf, 
> no dispatcher changes -- the existing dispatcher branches in 
> {{WindowEvaluatorFactoryBase}} (shrinking and moving) already gate on 
> {{eligibleForSegTree}}, which calls {{WindowSegmentTree.isEligible}}. Update 
> the docstring to enumerate First/Last and document the audit explicitly.
> h2. Behaviour
> * Same opt-in conf: {{spark.sql.window.segmentTree.enabled=false}} (default 
> off).
> * Same eligibility gate (DeclarativeAggregate, no FILTER, no DISTINCT, 
> supported frame type).
> * Same fallback for partitions below 
> {{spark.sql.window.segmentTree.minPartitionRows}}.
> * Both respect-nulls (the default for {{first()}} / {{last()}}) and IGNORE 
> NULLS are routed.
> * No analyzer / SQL grammar / plan-shape changes.
> h2. Benchmark
> {{FirstLastSegmentTreeWindowBenchmark}} on Linux x86_64 (Intel Xeon Platinum 
> 8259CL @ 2.50GHz, OpenJDK 25.0.3+9-LTS):
> Sliding frame {{[-1000, +1000]}} at N=10K:
> || Aggregate || naive (best) || segtree (best) || speedup ||
> | FIRST respect-nulls |  414 ms |  94 ms |  4.4X |
> | LAST  respect-nulls |  728 ms | 101 ms |  7.2X |
> | FIRST ignore-nulls  |  528 ms |  86 ms |  6.1X |
> | LAST  ignore-nulls  |  913 ms |  91 ms | 10.0X |
> Shrinking frame {{[CURRENT ROW, UNBOUNDED FOLLOWING]}} at N=10K:
> || Aggregate || naive (best) || segtree (best) || speedup ||
> | FIRST respect-nulls | 2 158 ms |  79 ms | 27.5X |
> | LAST  respect-nulls | 2 412 ms |  79 ms | 30.6X |
> | FIRST ignore-nulls  | 2 363 ms |  76 ms | 30.9X |
> | LAST  ignore-nulls  | 3 399 ms |  79 ms | 43.0X |
> N-sweep on FIRST shrinking:
> || N || naive || segtree || speedup ||
> |   5K |     580 ms |  64 ms |   9.1X |
> |  25K |  13 407 ms | 107 ms | 125.5X |
> |  50K |  53 784 ms | 172 ms | 312.0X |
> | 100K |       --   | 287 ms |    --  |
> Naive at N=100K is omitted (extrapolated cost ~3-4 min/iter); segtree path 
> stays sub-second.
> h2. Test surface
> * {{WindowSegmentTreeAllowlistSuite}}: 4 routing tests added for {{first}} / 
> {{last}} / {{first_ignore_nulls}} / {{last_ignore_nulls}}; previous "falls 
> through" negative tests flipped; mixed-allowlist test updated to use 
> {{collect_list}} (still on the denylist).
> * {{SegmentTreeWindowFunctionSuite}}: 6 oracle equivalence tests covering 
> sliding First/Last respect-nulls and ignore-nulls, all-NULL columns in both 
> modes, and a dedicated stretches-of-consecutive-NULLs test for the IGNORE 
> NULLS merge path.
> * {{UnboundedFollowingSegmentTreeSuite}}: 5 oracle equivalence tests covering 
> shrinking First/Last respect-nulls and ignore-nulls plus all-NULL boundary 
> case.
> All 97 tests in the three suites pass. 33 adjacent segtree tests pass 
> unchanged.
> h2. Out of scope
> * New frame class for First/Last (a future O(1)-amortized monotonic-deque 
> path is a separate optimization).
> * {{NthValue}} over moving frames (separate JIRA).
> * {{CollectList}}, {{CollectSet}}, {{Percentile}}, {{ApproxPercentile}}, 
> {{HyperLogLogPlusPlus}} -- still excluded.
> * {{ImperativeAggregate}} and UDAFs -- still excluded.
> Follows up SPARK-56546 and SPARK-57220.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to