Peter Andrew created SPARK-54055:
------------------------------------
Summary: Spark Connect sessions leak pyspark UDF daemon processes
and threads
Key: SPARK-54055
URL: https://issues.apache.org/jira/browse/SPARK-54055
Project: Spark
Issue Type: Bug
Components: Connect, PySpark
Affects Versions: 3.5.5
Reporter: Peter Andrew
Each Spark Connect session that uses Python UDFs seems to leak one PySpark
`daemon` process. Over time, these can accumulate in the 100s until the
corresponding node or container goes OOM.
{code:java}
spark 263 0.0 0.0 121424 59504 ? S 05:00 0:01 \_
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark 1515 0.0 0.0 121324 60148 ? S 05:04 0:01 \_
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark 1525 0.0 0.0 121324 60400 ? S 05:04 0:01 \_
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark 1568 0.0 0.0 121324 60280 ? S 05:04 0:01 \_
/opt/spark/.venv/bin/python3 -m pyspark.daemon{code}
In addition there are also threads leaking - e.g., here is a thread dump
histogram from a sample executor with 200+ leaked daemon processes:
{code:java}
# These threads seem to be leaking
226 threads Idle Worker Monitor for /opt/spark/.venv/bin/python3
226 threads process reaper
226 threads stderr reader for /opt/spark/.venv/bin/python3
226 threads stdout reader for /opt/spark/.venv/bin/python3
250 threads Worker Monitor for /opt/spark/.venv/bin/python3
# These threads seem fine, Spark is configured with 24 cores/executor
21 threads stdout writer for /opt/spark/.venv/bin/python3
21 threads Writer Monitor for /opt/spark/.venv/bin/python3{code}
This can be reproduced by running a bunch of Spark Connect sessions:
{code:java}
parallel -n0 .venv/bin/python3 dummy.py ::: {1..200} {code}
with `dummy.py`:
{code:java}
from collections.abc import Iterable
import pandas as pd
from pyspark.sql import SparkSession
def _udf(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
yield from iterator
if __name__ == "__main__":
spark = SparkSession.builder.remote("...").getOrCreate()
df = spark.range(128)
df.mapInPandas(_udf, df.schema).count()
{code}
With Spark Connect, each session always has a `SPARK_JOB_ARTIFACT_UUID`, even
if there are no artifacts, so the UDF environment built by
`BasePythonRunner.compute` is always different, and each session ends up with
its own `PythonWorkerFactory` and hence its own daemon process.
`PythonWorkerFactory` has a `stop` method that stops the daemon, but there does
not seem to be anyone that calls `PythonWorkerFactory.stop`, except at shutdown
in `SparkEnv.stop`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]