[
https://issues.apache.org/jira/browse/SPARK-50992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ángel Álvarez Pascua updated SPARK-50992:
-----------------------------------------
Attachment: Main.scala
> OOMs and performance issues with AQE in large plans
> ---------------------------------------------------
>
> Key: SPARK-50992
> URL: https://issues.apache.org/jira/browse/SPARK-50992
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 4.0.0, 3.5.3, 3.5.4
> Reporter: Ángel Álvarez Pascua
> Priority: Major
> Attachments: Main.scala
>
>
> When AQE is enabled, Spark triggers update events to the internal listener
> bus whenever a plan changes. These events include a plain-text description of
> the plan, which is computationally expensive to generate for large plans.
> *Key Issues:*
> # *High Cost of Plan String Calculation:*
> ** Generating the string description for large physical plans is a costly
> operation.
> ** This impacts performance, particularly in complex workflows with frequent
> plan updates (e.g. persisting DataFrames).
> # *Out-of-Memory (OOM) Errors:*
> ** Events are stored in the listener bus as {{SQLExecutionUIData}} objects
> and retained until a threshold is reached.
> ** This retention behavior can lead to memory exhaustion when processing
> large plans, causing OOM errors.
> # *Current Workarounds Are Ineffective:*
> ** *Reducing Retained Executions* ({{{}spark.sql.ui.retainedExecutions{}}}):
> Even when set to {{1}} or {{{}0{}}}, events are still created, requiring plan
> string calculations.
> ** *Limiting Plan String Length* ({{{}spark.sql.maxPlanStringLength{}}}):
> Reducing the maximum string length (e.g., to {{{}1,000,000{}}}) may mitigate
> OOMs but does not eliminate the overhead of string generation.
> ** *Available Explain Modes:* All existing explain modes are verbose and
> computationally expensive, failing to resolve these issues.
> *Proposed Solution:*
> Introduce a new explain mode, {*}{{off}}{*}, which suppresses the generation
> of plan string descriptions.
> * When this mode is enabled, Spark skips the calculation of plan
> descriptions altogether.
> * This resolves OOM errors and restores performance parity with non-AQE
> execution.
> *Impact of Proposed Solution:*
> * Eliminates OOMs in large plans with AQE enabled.
> * Reduces the performance overhead associated with plan string generation.
> * Ensures Spark scales better in environments with large, complex plans.
> *Reproducibility:*
> The following test replicates the issue:
>
> {code:java}
> import org.apache.spark.sql.{DataFrame, SparkSession}
> import org.apache.spark.sql.functions._object
> Main {
> def main(args: Array[String]): Unit = {
> // Create SparkSession
> val spark = SparkSession.builder()
> .master("local[*]")
> // Disabling AQE fixes it
> //.config("spark.sql.adaptive.enabled", "false")
> // Still costly and doesn't fix the OOM
> //.config("spark.sql.ui.retainedExecutions","1")
> //.config("spark.sql.ui.retainedExecutions","0")
> // Still costly, triggers lots of warnings and ... fixes it?
> // After 15min it still haven't got to the 23rd iteration
> //.config("spark.sql.maxPlanStringLength","1000000")
> // Currently available explain modes are costly and throw OOM
> //.config("spark.sql.ui.explainMode","simple")
> //.config("spark.sql.ui.explainMode","extended")
> //.config("spark.sql.ui.explainMode","codegen")
> //.config("spark.sql.ui.explainMode","cost")
> //.config("spark.sql.ui.explainMode","formatted")
> // Works perfectly fine and it takes the same time as with AQE disabled
> //.config("spark.sql.ui.explainMode","off")
> .getOrCreate()
> import spark.implicits._
> // Create an empty DataFrame with an initial schema
> var df = spark.emptyDataFrame
> println("Initial empty DataFrame created.")
> // Start measuring execution time
> val startTime = System.currentTimeMillis()
> // Loop to modify DataFrame
> for (i <- 1 to 25) {
> // Add a new column with null values
> df = df.withColumn(s"col$i", lit(null: String)).cache()
> // Filter the DataFrame
> df = df.filter($"col$i".isNotNull)
> // Explain the query plan
> //df.explain()
> // Show the DataFrame
> df.show()
> }
> // Print execution time
> val totalTime = System.currentTimeMillis() - startTime
> println(s"Total execution time: $totalTime ms")
> // Stop the SparkSession
> spark.stop()
> println("SparkSession stopped.")
> }
> }{code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]