[ 
https://issues.apache.org/jira/browse/SPARK-55311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Angel Alvarez Pascua updated SPARK-55311:
-----------------------------------------
    Attachment: test_round_3.py

> ChecksumCheckpointFileManager spawns 800 threads by default in streaming jobs
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-55311
>                 URL: https://issues.apache.org/jira/browse/SPARK-55311
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 4.1.0, 4.1.1
>            Reporter: Angel Alvarez Pascua
>            Priority: Trivial
>         Attachments: test_round_3.py
>
>
> Spark 4.1 introduced a new feature ({*}ChecksumCheckpointFileManager){*} 
> where each partition creates its own {{{}ChecksumFileOutputSuite{}}}, and 
> each suite spawns *4 threads* to parallelize checkpoint file operations 
> (offset logs, metadata, state updates, commit logs, etc.).
> In {*}Structured Streaming{*}, every trigger writes checkpoint files. The 
> effect becomes extreme when users run streaming queries with the default:
> spark.sql.shuffle.partitions = 200
> With the new threaded ChecksumCheckpointFileManager implementation:
>  * 200 shuffle partitions × 4 threads per partition = *~800 new threads 
> created*
>  * Even simple streaming jobs (e.g., noop sink) see large thread-pool 
> explosions
>  * {{.outputMode("complete")}} amplifies the effect by rewriting results each 
> trigger
>  * High throughput workloads (e.g., SHA-512 hashing, explode operations, 50k 
> rows/s) cause sustained thread churn and pressure on the JVM and OS
> These threads are created *regardless of user intent* and {*}regardless of 
> whether high parallelism is needed{*}, because streaming uses the batch 
> default of 200 partitions.
> This regression does _not_ appear in batch jobs because they write far fewer 
> metadata files.
> *Proposed Change*
> For Structured Streaming only, reduce the default:
> spark.sql.shuffle.partitions = 25
>  
> Rationale:
>  * Many streaming workloads do not require 200 partitions; 25 is sufficient 
> for typical micro-batches.
>  * Users needing higher parallelism can always increase the value explicitly.
>  * Fewer default partitions dramatically reduces 
> ChecksumCheckpointFileManager threads (25×4 = 100 threads instead of 800).
>  * Prevents unexpected thread explosions for users upgrading to Spark 4.1.
>  * More sensible default until AQE becomes supported/fully enabled for 
> streaming.
> *Compatibility*
> Low risk. Users needing 200 partitions can explicitly override the default.
> *Additional Notes*
> This issue has been observed in simple workloads where Spark 4.1 streaming 
> jobs show constant predictable spikes of hundreds of threads even with 
> moderate throughput and lightweight sinks.
>  
> !https://miro.medium.com/v2/resize:fit:700/1*dIiANw3pVPj2DOAfuqfV-g.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to