[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16763820#comment-16763820
 ] 

Tao Luo commented on SPARK-21492:
---------------------------------

If SortMergeJoinScanner doesn't consume the iterator from 
UnsafeExternalRowSorter entirely, the memory that UnsafeExternalSorter acquired 
from TaskMemoryManager will never be released. This leads to a memory leak, 
spills, and OOME. A page will be held per partition of the unused iterator.



{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
{{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}

{{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
 

Using broadcast succeeds:
{code:java}
#broadcast
joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code}
Running on Spark 2.4. 

 

Or if you prefer Scala:
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, rand}

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn("value", rand())
var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn("value2", rand())
var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 

 

Just reproduced it in standalone mode using 
[https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz,]
 Python 3.7. Same code as above.

 

Succeeds with 1 thread: ./bin/pyspark --master local[1]

Fails with >1 thread: ./bin/pyspark --master local[4]
{code:java}
SparkSession available as 'spark'.

>>> from pyspark.sql.functions import rand, col

>>>

>>> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

>>> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

>>>

>>> r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))

>>> r1 = r1.withColumn('value', rand())

>>> r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))

>>> r2 = r2.withColumn('value2', rand())

>>> joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")

>>> joined = joined.coalesce(1)

>>> joined.show(){code}
 
{code:java}
[Stage 2:>                                                          (0 + 1) / 
1]2019-02-06 17:12:27 WARN  TaskMemoryManager:304 - Failed to allocate a page 
(1900544 bytes), try again.

2019-02-06 17:12:27 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 
6)

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of 
memory, got 0

at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)

at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119)

at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383)

at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407)

at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135){code}
 

> Memory leak in SortMergeJoin
> ----------------------------
>
>                 Key: SPARK-21492
>                 URL: https://issues.apache.org/jira/browse/SPARK-21492
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>            Reporter: Zhan Zhang
>            Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to