This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 6aea60167413 [SPARK-56546][SQL][FOLLOWUP] Address review comments in 
segment-tree window frame
6aea60167413 is described below

commit 6aea6016741311d35dcee124ca505494e92f6be6
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed May 13 11:51:24 2026 +0800

    [SPARK-56546][SQL][FOLLOWUP] Address review comments in segment-tree window 
frame
    
    ### What changes were proposed in this pull request?
    
    Four small cleanups in the segment-tree moving-frame window code introduced 
by #55422:
    
    1. `WindowEvaluatorFactoryBase.scala` -- fix terminology in the `def 
processor` comment. The comment says `Keep as def (by-name)`, but `def 
processor(index: Int)` is a parameterized method, not a by-name parameter (`=> 
T`). Reword to `Keep as def (lazy / per-call)`.
    
    2. `WindowEvaluatorFactoryBase.eligibleForSegTree` -- add a defensive `case 
_ => false` to the `frameType match` so future additions to the sealed 
`FrameType` trait do not silently throw `MatchError` at runtime.
    
    3. `WindowEvaluatorFactoryBase.estimateMaxCachedBlocks` -- add a comment 
justifying the `+ 2` slack in the cached-block budget (one boundary block at 
each end of the frame's interval), since the magic number was not previously 
explained.
    
    4. `WindowSegmentTreeSuite.scala` -- fix indentation of 11 `test(` blocks 
that were declared at 4-space indent (inconsistent with the file's 2-space 
convention and the 3 other correctly-indented tests in the same file).
    
    A separate follow-up is needed to add RANGE-frame coverage to 
`WindowBenchmark` -- the current benchmark is RowFrame-only -- but that 
requires regenerating the committed JDK 17/21/25 results files and is deferred.
    
    ### Why are the changes needed?
    
    Review-comment-style follow-ups. Pure comment / defensive-default / 
whitespace changes -- no behavior change.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests cover all touched code paths. The test indentation fix is 
whitespace-only; the comment and `case _` changes have no runtime effect.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes, Claude assisted in identifying and drafting these cleanups.
    
    Closes #55815 from cloud-fan/cloud-fan/SPARK-56546-followup.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 407a29c6da0a9e88c2d57226024239c10f80c94e)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../window/WindowEvaluatorFactoryBase.scala        | 15 +++++++++++----
 .../execution/window/WindowSegmentTreeSuite.scala  | 22 +++++++++++-----------
 2 files changed, 22 insertions(+), 15 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
index cebcfd05bd24..2ae10ce9d711 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
@@ -203,10 +203,10 @@ trait WindowEvaluatorFactoryBase {
           case WindowExpression(ae: AggregateExpression, _) => ae.filter
           case _ => None
         }.toArray
-        // Keep as `def` (by-name): the FRAME_LESS_OFFSET / UNBOUNDED_OFFSET /
-        // UNBOUNDED_PRECEDING_OFFSET branches do not read `processor`. Eager
-        // `val` construction would invoke `AggregateProcessor.apply` on
-        // Lag / Lead / NthValue and throw
+        // Keep as `def` (lazy / per-call): the FRAME_LESS_OFFSET /
+        // UNBOUNDED_OFFSET / UNBOUNDED_PRECEDING_OFFSET branches do not read
+        // `processor`. Eager `val` construction would invoke
+        // `AggregateProcessor.apply` on Lag / Lead / NthValue and throw
         // `INTERNAL_ERROR: Unsupported aggregate function`.
         def processor = if 
(functions.exists(_.isInstanceOf[PythonFuncExpression])) {
           null
@@ -367,6 +367,7 @@ trait WindowEvaluatorFactoryBase {
     val frameTypeOk = frameType match {
       case RowFrame => true
       case RangeFrame => orderSpec.size == 1
+      case _ => false
     }
     conf.windowSegmentTreeEnabled &&
       frameTypeOk &&
@@ -395,6 +396,12 @@ trait WindowEvaluatorFactoryBase {
       case (IntegerLiteral(lo), CurrentRow) => Some(math.abs(lo) + 1)
       case _ => None
     }
+    // `ceil(W / blockSize)` is the minimum number of blocks a single frame can
+    // straddle; `+ 2` adds one block of slack at each end to cover the case
+    // where the frame's [lower, upper) interval is offset within its leftmost
+    // block (so the cursor temporarily holds the previous block as well) and
+    // the symmetric case at the right edge -- without this slack the LRU
+    // would thrash on the boundary blocks every time the cursor advances.
     w.map(ww => math.ceil(ww.toDouble / blockSize).toInt + 2).orElse(Some(8))
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreeSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreeSuite.scala
index ac2a04744fcf..443d160394fa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreeSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreeSuite.scala
@@ -87,7 +87,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with 
LocalSparkContext {
     if (out.isNullAt(0)) null else out.getInt(0)
   }
 
-    test("build and single-point query returns identity; full scan matches 
naive") {
+  test("build and single-point query returns identity; full scan matches 
naive") {
     withTaskContext {
       val values = Seq(5, 2, 9, 1, 7, 3, 4, 8, 6, 0)
       val tree = buildTree(values, fanout = 4, blockSize = 1024)
@@ -102,7 +102,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
-    test("single-block: range query matches naive baseline for random ranges") 
{
+  test("single-block: range query matches naive baseline for random ranges") {
     withTaskContext {
       val rnd = new Random(0xC0FFEE)
       val values = Seq.fill(100)(rnd.nextInt(1000))
@@ -119,7 +119,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
-    test("fanout boundaries: sizes {1, F, F+1, F*F} for fanout in {2,4,8,16}") 
{
+  test("fanout boundaries: sizes {1, F, F+1, F*F} for fanout in {2,4,8,16}") {
     withTaskContext {
       val rnd = new Random(42)
       for (fanout <- Seq(2, 4, 8, 16)) {
@@ -151,7 +151,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
-    test("identity at empty range query(k, k)") {
+  test("identity at empty range query(k, k)") {
     withTaskContext {
       val values = (1 to 50).reverse
       val tree = buildTree(values, fanout = 4, blockSize = 16)
@@ -163,7 +163,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
-    test("block boundary correctness: cross-block vs single-block baseline") {
+  test("block boundary correctness: cross-block vs single-block baseline") {
     withTaskContext {
       val rnd = new Random(123)
       val values = Seq.fill(100)(rnd.nextInt(10000))
@@ -182,7 +182,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
-    test("LRU stability: same queries in different orders produce same 
results") {
+  test("LRU stability: same queries in different orders produce same results") 
{
     withTaskContext {
       val rnd = new Random(777)
       val values = Seq.fill(100)(rnd.nextInt(10000))
@@ -213,7 +213,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
-    test("cross-block: range query matches naive baseline for random ranges") {
+  test("cross-block: range query matches naive baseline for random ranges") {
     withTaskContext {
       val rnd = new Random(0xBEEF)
       val values = Seq.fill(100)(rnd.nextInt(1000))
@@ -231,7 +231,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
-    test("cross-block: multi-block level-size invariant") {
+  test("cross-block: multi-block level-size invariant") {
     withTaskContext {
       val rnd = new Random(31337)
       val fanout = 4
@@ -329,7 +329,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
-    test("D10 rebuild: second build replaces state; failed build preserves 
prior state") {
+  test("D10 rebuild: second build replaces state; failed build preserves prior 
state") {
     withTaskContext {
       val v1 = Seq(5, 1, 9, 3, 7, 2, 8, 4, 6, 0)
       val v2 = Seq(100, 200, 50, 400, 25, 600, 12, 800)
@@ -380,7 +380,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
-    test("D11 error paths: invalid ctor args and invalid query ranges") {
+  test("D11 error paths: invalid ctor args and invalid query ranges") {
     withTaskContext {
       // Constructor validation.
       intercept[IllegalArgumentException] {
@@ -503,7 +503,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with 
LocalSparkContext {
     math.sqrt(sq / (n - 1))
   }
 
-    test("D12 block-aligned cross-block boundaries") {
+  test("D12 block-aligned cross-block boundaries") {
     withTaskContext {
       val rnd = new Random(12)
       val numRows = 50


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to