Shardul Mahadik created SPARK-44379:
---------------------------------------
Summary: Broadcast Joins taking up too much memory
Key: SPARK-44379
URL: https://issues.apache.org/jira/browse/SPARK-44379
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 3.4.1
Reporter: Shardul Mahadik
Context: After migrating to Spark 3 with AQE, we saw a significant increase in
driver and executor memory usage in our jobs which contains star joins. By
analyzing heapdump, we saw that majority of the memory was being taken up by
{{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92
broadcast joins in the query.
!image-2023-07-11-10-41-02-251.png|width=851,height=70!
This took up over 6GB of total memory, even though every table being
broadcasted was around ~1MB and hence should only have been ~100MB total. I
found that this is because {{BytesToBytesMap}} used within
{{UnsafeHashedRelation}} allocates memory in ["pageSize"
increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117]
which in our case was 64MB. Based on the [default page size
calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251],
this should be the case for any container with > 1 GB of memory (assuming
executor.cores = 1) which is far too common. Thus in our case, most of the
memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s.
!image-2023-07-11-10-52-59-553.png|width=389,height=101!
I think this is a major inefficiency for broadcast joins (especially star
joins). I think there are a few ways to tackle the problem.
1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce
the memory consumption of broadcast joins, but I am not sure what it implies
for the rest of Spark machinery
2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all
values are added to the map and allocates a new page only for the required
bytes.
3) Enhance the serialization of {{BytesToBytesMap}} to record the number of
keys and values, and use those during deserialization to only request the
required memory.
4) Use a lower page size for certain {{BytesToBytesMap}}s based on the
estimated data size of broadcast joins.
I believe Option 3 would be simple enough to implement and I have a POC PR
which I will post soon, but I am interested in knowing other people's thoughts
here.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]