Angel Alvarez Pascua created SPARK-55311:
--------------------------------------------

             Summary: ChecksumCheckpointFileManager spawns 800 threads by 
default in streaming jobs
                 Key: SPARK-55311
                 URL: https://issues.apache.org/jira/browse/SPARK-55311
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 4.1.1, 4.1.0
            Reporter: Angel Alvarez Pascua


Spark 4.1 introduced a new feature ({*}ChecksumCheckpointFileManager){*} where 
each partition creates its own {{{}ChecksumFileOutputSuite{}}}, and each suite 
spawns *4 threads* to parallelize checkpoint file operations (offset logs, 
metadata, state updates, commit logs, etc.).

In {*}Structured Streaming{*}, every trigger writes checkpoint files. The 
effect becomes extreme when users run streaming queries with the default:
spark.sql.shuffle.partitions = 200
With the new threaded ChecksumCheckpointFileManager implementation:
 * 200 shuffle partitions × 4 threads per partition = *~800 new threads created*

 * Even simple streaming jobs (e.g., noop sink) see large thread-pool explosions

 * {{.outputMode("complete")}} amplifies the effect by rewriting results each 
trigger

 * High throughput workloads (e.g., SHA-512 hashing, explode operations, 50k 
rows/s) cause sustained thread churn and pressure on the JVM and OS

These threads are created *regardless of user intent* and {*}regardless of 
whether high parallelism is needed{*}, because streaming uses the batch default 
of 200 partitions.

This regression does _not_ appear in batch jobs because they write far fewer 
metadata files.

*Proposed Change*
For Structured Streaming only, reduce the default:
spark.sql.shuffle.partitions = 25
 
Rationale: * Many streaming workloads do not require 200 partitions; 25 is 
sufficient for typical micro-batches.

 * Users needing higher parallelism can always increase the value explicitly.

 * Fewer default partitions dramatically reduces ChecksumCheckpointFileManager 
threads (25×4 = 100 threads instead of 800).

 * Prevents unexpected thread explosions for users upgrading to Spark 4.1.

 * More sensible default until AQE becomes supported/fully enabled for 
streaming.

*Compatibility*
Low risk. Users needing 200 partitions can explicitly override the default.

*Additional Notes*
This issue has been observed in real workloads where Spark 4.1 streaming jobs 
show unpredictable spikes of hundreds of threads even with moderate throughput 
and lightweight sinks.

 

!https://miro.medium.com/v2/resize:fit:700/1*dIiANw3pVPj2DOAfuqfV-g.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to