This is an automated email from the ASF dual-hosted git repository.

gengliangwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f70b6065d2d [SPARK-57199][SQL] Extract the aggregate out-of-memory 
error into a QueryExecutionErrors factory
4f70b6065d2d is described below

commit 4f70b6065d2de31b5509ef41035ca8a1236d8953
Author: Gengliang Wang <[email protected]>
AuthorDate: Tue Jun 2 20:57:01 2026 -0700

    [SPARK-57199][SQL] Extract the aggregate out-of-memory error into a 
QueryExecutionErrors factory
    
    ### What changes were proposed in this pull request?
    
    The aggregate out-of-memory error (`AGGREGATE_OUT_OF_MEMORY`) is 
constructed inline in two places:
    
    - `HashAggregateExec`, whose whole-stage codegen emits `throw new 
<SparkOutOfMemoryError>("AGGREGATE_OUT_OF_MEMORY", new java.util.HashMap());` 
into every generated aggregate class.
    - `TungstenAggregationIterator` (the interpreted fallback), which throws 
the same `new SparkOutOfMemoryError(...)` and needs a `// scalastyle:off 
throwerror` suppression.
    
    This PR adds a `QueryExecutionErrors.aggregateOutOfMemoryError()` factory 
(next to the existing `cannotAcquireMemory*` OOM factories) and routes both 
call sites through it. In the codegen path the emitted Java becomes `throw 
QueryExecutionErrors.aggregateOutOfMemoryError();`.
    
    ### Why are the changes needed?
    
    Sub-task of SPARK-56908 (reduce generated Java size in whole-stage 
codegen). Dumping the whole-stage codegen of the TPC-DS queries shows the 
inline `throw new 
org.apache.spark.memory.SparkOutOfMemoryError("AGGREGATE_OUT_OF_MEMORY", new 
java.util.HashMap());` line **445 times** across 142 of 150 generated classes 
-- it is the single most-repeated `throw` in the corpus. Replacing it with a 
factory call shrinks each generated aggregate class and moves the error-class 
string and the empty [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. The same `AGGREGATE_OUT_OF_MEMORY` error with the same (empty) message 
parameters is thrown; only where it is constructed changes.
    
    ### How was this patch tested?
    
    This is a behavior-preserving refactor, covered by the existing aggregate 
suites (e.g. `DataFrameAggregateSuite`, 163 tests, pass). The change was 
additionally verified by re-dumping the TPC-DS whole-stage codegen: all 445 
inline throws are now `QueryExecutionErrors.aggregateOutOfMemoryError()` calls, 
and every generated subtree still compiles (the Janino default imports already 
make `QueryExecutionErrors` available unqualified, as used by other generated 
error calls such as `divideBy [...]
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.8)
    
    Closes #56256 from gengliangwang/spark-agg-oom-factory.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala  | 6 ++++++
 .../apache/spark/sql/execution/aggregate/HashAggregateExec.scala  | 5 +----
 .../sql/execution/aggregate/TungstenAggregationIterator.scala     | 8 ++------
 3 files changed, 9 insertions(+), 10 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 6cbc2eaaabb6..48edb6e38126 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1213,6 +1213,12 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
       new java.util.HashMap[String, String]())
   }
 
+  def aggregateOutOfMemoryError(): SparkOutOfMemoryError = {
+    new SparkOutOfMemoryError(
+      "AGGREGATE_OUT_OF_MEMORY",
+      new java.util.HashMap[String, String]())
+  }
+
   def cannotAcquireMemoryForWindowAggregateError(
       requestedBytes: Long,
       receivedBytes: Long): SparkOutOfMemoryError = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 7b3f9ec9951e..e1bfb50634d0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -23,7 +23,6 @@ import scala.collection.mutable
 
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.LogKeys.CONFIG
-import org.apache.spark.memory.SparkOutOfMemoryError
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -660,8 +659,6 @@ case class HashAggregateExec(
       case _ => ("true", "", "")
     }
 
-    val oomeClassName = classOf[SparkOutOfMemoryError].getName
-
     val findOrInsertRegularHashMap: String =
       s"""
          |// generate grouping key
@@ -687,7 +684,7 @@ case class HashAggregateExec(
          |    $unsafeRowKeys, $unsafeRowKeyHash);
          |  if ($unsafeRowBuffer == null) {
          |    // failed to allocate the first page
-         |    throw new $oomeClassName("AGGREGATE_OUT_OF_MEMORY", new 
java.util.HashMap());
+         |    throw QueryExecutionErrors.aggregateOutOfMemoryError();
          |  }
          |}
        """.stripMargin
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 073e5929025b..00d18a2f79a8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -17,16 +17,14 @@
 
 package org.apache.spark.sql.execution.aggregate
 
-import java.util
-
 import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
-import org.apache.spark.memory.SparkOutOfMemoryError
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.{UnsafeFixedWidthAggregationMap, 
UnsafeKVExternalSorter}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.unsafe.KVIterator
@@ -211,9 +209,7 @@ class TungstenAggregationIterator(
           buffer = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
           if (buffer == null) {
             // failed to allocate the first page
-            // scalastyle:off throwerror
-            throw new SparkOutOfMemoryError("AGGREGATE_OUT_OF_MEMORY", new 
util.HashMap())
-            // scalastyle:on throwerror
+            throw QueryExecutionErrors.aggregateOutOfMemoryError()
           }
         }
         processRow(buffer, newInput)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to