[ 
https://issues.apache.org/jira/browse/SPARK-54055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Andrew updated SPARK-54055:
---------------------------------
    Description: 
Each Spark Connect session that uses Python UDFs seems to leak one PySpark 
`daemon` process. Over time, these can accumulate in the 100s until the 
corresponding node or container goes OOM (although it can take a while as each 
process doesn't use a lot of memory and some of those pages are shared).
{code:java}
spark        263  0.0  0.0 121424 59504 ?        S    05:00   0:01  \_ 
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark       1515  0.0  0.0 121324 60148 ?        S    05:04   0:01  \_ 
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark       1525  0.0  0.0 121324 60400 ?        S    05:04   0:01  \_ 
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark       1568  0.0  0.0 121324 60280 ?        S    05:04   0:01  \_ 
/opt/spark/.venv/bin/python3 -m pyspark.daemon{code}
In addition there are also threads leaking - e.g., here is a thread dump 
histogram from a sample executor with 200+ leaked daemon processes:
{code:java}
# These threads seem to be leaking
226 threads   Idle Worker Monitor for /opt/spark/.venv/bin/python3
226 threads   process reaper
226 threads   stderr reader for /opt/spark/.venv/bin/python3
226 threads   stdout reader for /opt/spark/.venv/bin/python3
250 threads   Worker Monitor for /opt/spark/.venv/bin/python3

# These threads seem fine, Spark is configured with 24 cores/executor
21 threads    stdout writer for /opt/spark/.venv/bin/python3
21 threads    Writer Monitor for /opt/spark/.venv/bin/python3{code}
With Spark Connect, each session always has a `SPARK_JOB_ARTIFACT_UUID`, even 
if there are no artifacts, so the UDF environment built by 
`BasePythonRunner.compute` is always different, and each session ends up with 
its own `PythonWorkerFactory` and hence its own daemon process.

`PythonWorkerFactory` has a `stop` method that stops the daemon, but there does 
not seem to be anyone that calls `PythonWorkerFactory.stop`, except at shutdown 
in `SparkEnv.stop`.

 

This can be reproduced by running a bunch of Spark Connect sessions:
{code:java}
parallel -n0 .venv/bin/python3 dummy.py ::: {1..200} {code}
with `dummy.py`:
{code:java}
from collections.abc import Iterable

import pandas as pd
from pyspark.sql import SparkSession


def _udf(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
    yield from iterator


if __name__ == "__main__":
    spark = SparkSession.builder.remote("...").getOrCreate()

    df = spark.range(128)
    df.mapInPandas(_udf, df.schema).count()
 {code}
 

  was:
Each Spark Connect session that uses Python UDFs seems to leak one PySpark 
`daemon` process. Over time, these can accumulate in the 100s until the 
corresponding node or container goes OOM.
{code:java}
spark        263  0.0  0.0 121424 59504 ?        S    05:00   0:01  \_ 
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark       1515  0.0  0.0 121324 60148 ?        S    05:04   0:01  \_ 
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark       1525  0.0  0.0 121324 60400 ?        S    05:04   0:01  \_ 
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark       1568  0.0  0.0 121324 60280 ?        S    05:04   0:01  \_ 
/opt/spark/.venv/bin/python3 -m pyspark.daemon{code}
In addition there are also threads leaking - e.g., here is a thread dump 
histogram from a sample executor with 200+ leaked daemon processes:
{code:java}
# These threads seem to be leaking
226 threads   Idle Worker Monitor for /opt/spark/.venv/bin/python3
226 threads   process reaper
226 threads   stderr reader for /opt/spark/.venv/bin/python3
226 threads   stdout reader for /opt/spark/.venv/bin/python3
250 threads   Worker Monitor for /opt/spark/.venv/bin/python3

# These threads seem fine, Spark is configured with 24 cores/executor
21 threads    stdout writer for /opt/spark/.venv/bin/python3
21 threads    Writer Monitor for /opt/spark/.venv/bin/python3{code}
With Spark Connect, each session always has a `SPARK_JOB_ARTIFACT_UUID`, even 
if there are no artifacts, so the UDF environment built by 
`BasePythonRunner.compute` is always different, and each session ends up with 
its own `PythonWorkerFactory` and hence its own daemon process.

`PythonWorkerFactory` has a `stop` method that stops the daemon, but there does 
not seem to be anyone that calls `PythonWorkerFactory.stop`, except at shutdown 
in `SparkEnv.stop`.

 

This can be reproduced by running a bunch of Spark Connect sessions:
{code:java}
parallel -n0 .venv/bin/python3 dummy.py ::: {1..200} {code}
with `dummy.py`:
{code:java}
from collections.abc import Iterable

import pandas as pd
from pyspark.sql import SparkSession


def _udf(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
    yield from iterator


if __name__ == "__main__":
    spark = SparkSession.builder.remote("...").getOrCreate()

    df = spark.range(128)
    df.mapInPandas(_udf, df.schema).count()
 {code}
 


> Spark Connect sessions leak pyspark UDF daemon processes and threads
> --------------------------------------------------------------------
>
>                 Key: SPARK-54055
>                 URL: https://issues.apache.org/jira/browse/SPARK-54055
>             Project: Spark
>          Issue Type: Bug
>          Components: Connect, PySpark
>    Affects Versions: 3.5.5
>            Reporter: Peter Andrew
>            Priority: Major
>
> Each Spark Connect session that uses Python UDFs seems to leak one PySpark 
> `daemon` process. Over time, these can accumulate in the 100s until the 
> corresponding node or container goes OOM (although it can take a while as 
> each process doesn't use a lot of memory and some of those pages are shared).
> {code:java}
> spark        263  0.0  0.0 121424 59504 ?        S    05:00   0:01  \_ 
> /opt/spark/.venv/bin/python3 -m pyspark.daemon
> spark       1515  0.0  0.0 121324 60148 ?        S    05:04   0:01  \_ 
> /opt/spark/.venv/bin/python3 -m pyspark.daemon
> spark       1525  0.0  0.0 121324 60400 ?        S    05:04   0:01  \_ 
> /opt/spark/.venv/bin/python3 -m pyspark.daemon
> spark       1568  0.0  0.0 121324 60280 ?        S    05:04   0:01  \_ 
> /opt/spark/.venv/bin/python3 -m pyspark.daemon{code}
> In addition there are also threads leaking - e.g., here is a thread dump 
> histogram from a sample executor with 200+ leaked daemon processes:
> {code:java}
> # These threads seem to be leaking
> 226 threads   Idle Worker Monitor for /opt/spark/.venv/bin/python3
> 226 threads   process reaper
> 226 threads   stderr reader for /opt/spark/.venv/bin/python3
> 226 threads   stdout reader for /opt/spark/.venv/bin/python3
> 250 threads   Worker Monitor for /opt/spark/.venv/bin/python3
> # These threads seem fine, Spark is configured with 24 cores/executor
> 21 threads    stdout writer for /opt/spark/.venv/bin/python3
> 21 threads    Writer Monitor for /opt/spark/.venv/bin/python3{code}
> With Spark Connect, each session always has a `SPARK_JOB_ARTIFACT_UUID`, even 
> if there are no artifacts, so the UDF environment built by 
> `BasePythonRunner.compute` is always different, and each session ends up with 
> its own `PythonWorkerFactory` and hence its own daemon process.
> `PythonWorkerFactory` has a `stop` method that stops the daemon, but there 
> does not seem to be anyone that calls `PythonWorkerFactory.stop`, except at 
> shutdown in `SparkEnv.stop`.
>  
> This can be reproduced by running a bunch of Spark Connect sessions:
> {code:java}
> parallel -n0 .venv/bin/python3 dummy.py ::: {1..200} {code}
> with `dummy.py`:
> {code:java}
> from collections.abc import Iterable
> import pandas as pd
> from pyspark.sql import SparkSession
> def _udf(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
>     yield from iterator
> if __name__ == "__main__":
>     spark = SparkSession.builder.remote("...").getOrCreate()
>     df = spark.range(128)
>     df.mapInPandas(_udf, df.schema).count()
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to