[
https://issues.apache.org/jira/browse/SPARK-50462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alberto Andreotti updated SPARK-50462:
--------------------------------------
Description:
+*Observed Behavior:*+
When some explode() operation is performed and copies of elements in rows are
created, the memory grows proportionally to the number of new copies.
+*Expected Behavior:*+
Memory should be kept constant regardless of the new rows because elements in
rows are {*}+immutable+{*}, so there's no point in keeping multiple copies.
At most, memory should grow with the number of partitions.
+*Discussion:*+
This is especially important when the data you're duplicating is a heavy
buffer, for example a binary file. You don't want multiple copies be created in
memory for the binary file. Also, it's not necessary because explode()
operations don't affect the value of elements.
Check this example,
{code:java}
from pyspark.sql import Row
from pyspark.sql.functions import lit, col, explode, array
my_df = spark.read.format("binaryFile").load(my_200MB_file)
# Memory consumption should be independent of this one
K = 14
# Create a range column [0, 1, ..., K-1] for each row in the DataFrame
ranges = array([lit(i) for i in range(K)])
# Add a new column with the range array and explode it
expanded_df = (
my_df
.withColumn("range", ranges) # Add range array column
.select(col("content"), explode(col("range")).alias("index"))
)
# Do something with the DF
expanded_df.write.parquet("./pepe.parquet")
{code}
The number of copies of the binary buffer shouldn't change with 'K'.
was:
+*Observed Behavior:*+
When some explode() operation is performed and copies of elements in rows are
created, the memory grows proportionally to the number of new copies.
+*Expected Behavior:*+
Memory should be kept constant regardless of the new rows because elements in
rows are {*}+immutable+{*}, so there's no point in keeping multiple copies.
At most, memory should grow with the number of partitions.
+*Discussion:*+
This is especially important when the data you're duplicating is a heavy
buffer, for example a binary file. You don't want multiple copies be created in
memory for the binary file. Also, it's not necessary because explode()
operations don't affect the value of elements.
Check this example,
{code:java}
from pyspark.sql import Row
from pyspark.sql.functions import lit, col, explode, array
dicom_df = spark.read.format("binaryFile").load(my_200MB_file)
# Memory consumption should be independent of this one
K = 14
# Create a range column [0, 1, ..., K-1] for each row in the DataFrame
ranges = array([lit(i) for i in range(K)])
# Add a new column with the range array and explode it
expanded_df = (
dicom_df
.withColumn("range", ranges) # Add range array column
.select(col("content"), explode(col("range")).alias("index"))
)
# Do something with the DF
expanded_df.write.parquet("./pepe.parquet")
{code}
The number of copies of the binary buffer shouldn't change with 'K'.
> Spark creates unnecessary copies of data during explode operations
> ------------------------------------------------------------------
>
> Key: SPARK-50462
> URL: https://issues.apache.org/jira/browse/SPARK-50462
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.5.0
> Environment: Ubuntu 22, Python 3.8, Spark 3.5.0.
> Reporter: Alberto Andreotti
> Priority: Major
>
> +*Observed Behavior:*+
> When some explode() operation is performed and copies of elements in rows are
> created, the memory grows proportionally to the number of new copies.
> +*Expected Behavior:*+
> Memory should be kept constant regardless of the new rows because elements in
> rows are {*}+immutable+{*}, so there's no point in keeping multiple copies.
> At most, memory should grow with the number of partitions.
>
> +*Discussion:*+
> This is especially important when the data you're duplicating is a heavy
> buffer, for example a binary file. You don't want multiple copies be created
> in memory for the binary file. Also, it's not necessary because explode()
> operations don't affect the value of elements.
> Check this example,
> {code:java}
>
> from pyspark.sql import Row
> from pyspark.sql.functions import lit, col, explode, array
> my_df = spark.read.format("binaryFile").load(my_200MB_file)
> # Memory consumption should be independent of this one
> K = 14
> # Create a range column [0, 1, ..., K-1] for each row in the DataFrame
> ranges = array([lit(i) for i in range(K)])
> # Add a new column with the range array and explode it
> expanded_df = (
> my_df
> .withColumn("range", ranges) # Add range array column
> .select(col("content"), explode(col("range")).alias("index"))
> )
> # Do something with the DF
> expanded_df.write.parquet("./pepe.parquet")
> {code}
> The number of copies of the binary buffer shouldn't change with 'K'.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]