[ 
https://issues.apache.org/jira/browse/SPARK-23246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MBA Learns to Code updated SPARK-23246:
---------------------------------------
    Description: 
I am having consistent OOM crashes when trying to use PySpark for iterative 
algorithms in which I create new DataFrames per iteration (e.g. by sampling 
from a "mother" DataFrame), do something with such DataFrames, and never need 
such DataFrames ever in future iterations.

The below script simulates such OOM failures. Even when one tries explicitly 
.unpersist() the temporary DataFrames (by using the --unpersist flag below) 
and/or deleting and garbage-collecting the Python objects (by using the --py-gc 
flag below), the Java objects seem to stay on and accumulate until they exceed 
the JVM/driver memory.

Please suggest how I may overcome this so that we can have long-running 
iterative programs using Spark that uses resources only up to a bounded, 
controllable limit.

 
{code:java}
from __future__ import print_function

import argparse
import gc
import pandas

import pyspark


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--unpersist', action='store_true')
arg_parser.add_argument('--py-gc', action='store_true')
arg_parser.add_argument('--n-partitions', type=int, default=1000)
args = arg_parser.parse_args()


# create SparkSession (*** set spark.driver.memory to 512m in 
spark-defaults.conf ***)
spark = pyspark.sql.SparkSession.builder \
    .config('spark.executor.instances', '2') \
    .config('spark.executor.cores', '2') \
    .config('spark.executor.memory', '512m') \
    .enableHiveSupport() \
    .getOrCreate()


# create Parquet file to subsequent repeated loading
df = spark.createDataFrame(
    pandas.DataFrame(
        dict(
            row=range(args.n_partitions),
            x=args.n_partitions * [0]
        )
    )
)

parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)

df.write.parquet(
    path=parquet_path,
    partitionBy='row',
    mode='overwrite'
)


i = 0


# the below loop simulates an iterative algorithm that creates new DataFrames 
in each iteration (e.g. sampling from a "mother" DataFrame), do something, and 
never need those DataFrames again in future iteration
# we are having a problem cleaning up the built-up metadata
# hence the program will crash after while because of OOM
while True:
    _df = spark.read.parquet(parquet_path)

    if args.unpersist:
        _df.unpersist()

    if args.py_gc:
        del _df
        gc.collect()

    i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i))
{code}
 

  was:
I am having consistent OOM crashes when trying to use PySpark for iterative 
algorithms in which I create new DataFrames per iteration (e.g. by sampling 
from a "mother" DataFrame), so something with such DataFrames, and never need 
such DataFrames ever in future iterations.

The below script simulates such OOM failures. Even when one tries explicitly 
.unpersist() the temporary DataFrames (by using the --unpersist flag below) 
and/or deleting and garbage-collecting the Python objects (by using the --py-gc 
flag below), the Java objects seem to stay on and accumulate until they exceed 
the JVM/driver memory.

Please suggest how I may overcome this so that we can have long-running 
iterative programs using Spark that uses resources only up to a bounded, 
controllable limit.

 
{code:java}
from __future__ import print_function

import argparse
import gc
import pandas

import pyspark


arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--unpersist', action='store_true')
arg_parser.add_argument('--py-gc', action='store_true')
arg_parser.add_argument('--n-partitions', type=int, default=1000)
args = arg_parser.parse_args()


# create SparkSession (*** set spark.driver.memory to 512m in 
spark-defaults.conf ***)
spark = pyspark.sql.SparkSession.builder \
    .config('spark.executor.instances', '2') \
    .config('spark.executor.cores', '2') \
    .config('spark.executor.memory', '512m') \
    .enableHiveSupport() \
    .getOrCreate()


# create Parquet file to subsequent repeated loading
df = spark.createDataFrame(
    pandas.DataFrame(
        dict(
            row=range(args.n_partitions),
            x=args.n_partitions * [0]
        )
    )
)

parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)

df.write.parquet(
    path=parquet_path,
    partitionBy='row',
    mode='overwrite'
)


i = 0


# the below loop simulates an iterative algorithm that creates new DataFrames 
in each iteration (e.g. sampling from a "mother" DataFrame), do something, and 
never need those DataFrames again in future iteration
# we are having a problem cleaning up the built-up metadata
# hence the program will crash after while because of OOM
while True:
    _df = spark.read.parquet(parquet_path)

    if args.unpersist:
        _df.unpersist()

    if args.py_gc:
        del _df
        gc.collect()

    i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i))
{code}
 


> (Py)Spark OOM because of metadata build-up that cannot be cleaned
> -----------------------------------------------------------------
>
>                 Key: SPARK-23246
>                 URL: https://issues.apache.org/jira/browse/SPARK-23246
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core, SQL
>    Affects Versions: 2.2.1
>            Reporter: MBA Learns to Code
>            Priority: Critical
>
> I am having consistent OOM crashes when trying to use PySpark for iterative 
> algorithms in which I create new DataFrames per iteration (e.g. by sampling 
> from a "mother" DataFrame), do something with such DataFrames, and never need 
> such DataFrames ever in future iterations.
> The below script simulates such OOM failures. Even when one tries explicitly 
> .unpersist() the temporary DataFrames (by using the --unpersist flag below) 
> and/or deleting and garbage-collecting the Python objects (by using the 
> --py-gc flag below), the Java objects seem to stay on and accumulate until 
> they exceed the JVM/driver memory.
> Please suggest how I may overcome this so that we can have long-running 
> iterative programs using Spark that uses resources only up to a bounded, 
> controllable limit.
>  
> {code:java}
> from __future__ import print_function
> import argparse
> import gc
> import pandas
> import pyspark
> arg_parser = argparse.ArgumentParser()
> arg_parser.add_argument('--unpersist', action='store_true')
> arg_parser.add_argument('--py-gc', action='store_true')
> arg_parser.add_argument('--n-partitions', type=int, default=1000)
> args = arg_parser.parse_args()
> # create SparkSession (*** set spark.driver.memory to 512m in 
> spark-defaults.conf ***)
> spark = pyspark.sql.SparkSession.builder \
>     .config('spark.executor.instances', '2') \
>     .config('spark.executor.cores', '2') \
>     .config('spark.executor.memory', '512m') \
>     .enableHiveSupport() \
>     .getOrCreate()
> # create Parquet file to subsequent repeated loading
> df = spark.createDataFrame(
>     pandas.DataFrame(
>         dict(
>             row=range(args.n_partitions),
>             x=args.n_partitions * [0]
>         )
>     )
> )
> parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)
> df.write.parquet(
>     path=parquet_path,
>     partitionBy='row',
>     mode='overwrite'
> )
> i = 0
> # the below loop simulates an iterative algorithm that creates new DataFrames 
> in each iteration (e.g. sampling from a "mother" DataFrame), do something, 
> and never need those DataFrames again in future iteration
> # we are having a problem cleaning up the built-up metadata
> # hence the program will crash after while because of OOM
> while True:
>     _df = spark.read.parquet(parquet_path)
>     if args.unpersist:
>         _df.unpersist()
>     if args.py_gc:
>         del _df
>         gc.collect()
>     i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i))
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to