Jelmer Kuperus created SPARK-45675:
--------------------------------------
Summary: Specify number of partitions when creating spark
dataframe from pandas dataframe
Key: SPARK-45675
URL: https://issues.apache.org/jira/browse/SPARK-45675
Project: Spark
Issue Type: Improvement
Components: Pandas API on Spark
Affects Versions: 3.5.0
Reporter: Jelmer Kuperus
When converting a large pandas dataframe to a spark dataframe like so
{code:java}
import pandas as pd pdf = pd.DataFrame([{"board_id": "3074457346698037360_0",
"file_name": "board-content", "value": "A" * 119251} for i in range(0, 20000)])
spark.createDataFrame(pdf).write.mode("overwrite").format("delta").saveAsTable("catalog.schema.table"){code}
You can encounter the following error
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized
task 11:1 was 366405365 bytes, which exceeds max allowed:
spark.rpc.message.maxSize (268435456 bytes). Consider increasing
spark.rpc.message.maxSize or using broadcast variables for large values.
As far as I can tell spark first converts the pandas dataframe into a python
list and then constructs an rdd out of that list. which means that the
parallelism is determined by the value of spark.sparkcontext.defaultparallelism
and if the pandas dataframe is very large and the number of available cores is
low then you end up with very large tasks that exceed the limits imposed on the
size of tasks
Methods like spark.sparkContext.parallelize allow you to pass in the number of
partitions of the resulting dataset. I think having a similar capability when
creating a dataframe from a pandas dataframe makes a lot of sense. As right now
I think the only workaround I can think of is changing the value of
spark.default.parallelism but this is a system wide setting
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]