Modi Tamam created SPARK-28269:
----------------------------------

             Summary: ArrowStreamPandasSerializer get stack
                 Key: SPARK-28269
                 URL: https://issues.apache.org/jira/browse/SPARK-28269
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.4.3
            Reporter: Modi Tamam


I'm working with Pyspark version 2.4.3.

I have a big data frame:
 * ~15M rows
 * ~130 columns
 * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it 
(pandas_df.toPickle() ) resulted with a file of size 2.5GB.

I have some code that groups this data frame and applying a Pandas-UDF:

 
{code:java}
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import functions as F
import pyarrow.parquet as pq
import pyarrow as pa
non_issued_patch="31.7996378000_35.2114362000"
issued_patch = "31.7995787833_35.2121463045"

@pandas_udf("patch_name string", PandasUDFType.GROUPED_MAP)
def foo(pdf):
 import pandas as pd
 ret_val = pd.DataFrame({'patch_name': [pdf['patch_name'].iloc[0]]})
 return ret_val

full_df=spark.read.parquet('debug-mega-patch')
df = full_df.filter(F.col("grouping_column") == issued_patch).cache()

df.groupBy("grouping_column").apply(foo).repartition(1).write.mode('overwrite').parquet('debug-df/')
 
{code}
 

The above code gets stacked on the ArrowStreamPandasSerializer: (on the first 
line when reading batch from the reader)

 
{code:java}
for batch in reader:
 yield [self.arrow_to_pandas(c) for c in      
pa.Table.from_batches([batch]).itercolumns()]{code}
 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to