Modi Tamam created SPARK-28269:
----------------------------------
Summary: ArrowStreamPandasSerializer get stack
Key: SPARK-28269
URL: https://issues.apache.org/jira/browse/SPARK-28269
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 2.4.3
Reporter: Modi Tamam
I'm working with Pyspark version 2.4.3.
I have a big data frame:
* ~15M rows
* ~130 columns
* ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it
(pandas_df.toPickle() ) resulted with a file of size 2.5GB.
I have some code that groups this data frame and applying a Pandas-UDF:
{code:java}
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import functions as F
import pyarrow.parquet as pq
import pyarrow as pa
non_issued_patch="31.7996378000_35.2114362000"
issued_patch = "31.7995787833_35.2121463045"
@pandas_udf("patch_name string", PandasUDFType.GROUPED_MAP)
def foo(pdf):
import pandas as pd
ret_val = pd.DataFrame({'patch_name': [pdf['patch_name'].iloc[0]]})
return ret_val
full_df=spark.read.parquet('debug-mega-patch')
df = full_df.filter(F.col("grouping_column") == issued_patch).cache()
df.groupBy("grouping_column").apply(foo).repartition(1).write.mode('overwrite').parquet('debug-df/')
{code}
The above code gets stacked on the ArrowStreamPandasSerializer: (on the first
line when reading batch from the reader)
{code:java}
for batch in reader:
yield [self.arrow_to_pandas(c) for c in
pa.Table.from_batches([batch]).itercolumns()]{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]