[ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16761607#comment-16761607
 ] 

Tao Luo edited comment on SPARK-24657 at 2/7/19 2:03 AM:
---------------------------------------------------------

Sure:
{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
 {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}

{{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
  

Using broadcast succeeds:
{code:java}
#broadcast
joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code}
Running on Spark 2.4. 

 

Or if you prefer Scala:
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, rand}

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn("value", rand())
var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn("value2", rand())
var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
{code}


was (Author: taoluo):
Sure:
{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
 {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}

{{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
  

Using broadcast succeeds:
{code:java}
#broadcast
joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code}
Running on Spark 2.4. 

 

Or if you prefer Scala:
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, rand}

val spark = new SparkSession()
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn("value", rand())
var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn("value2", rand())
var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
{code}

> SortMergeJoin may cause SparkOutOfMemory  in execution memory  because of not 
> cleanup resource when finished the merge join
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24657
>                 URL: https://issues.apache.org/jira/browse/SPARK-24657
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0, 2.3.0, 2.3.1
>            Reporter: Joshuawangzj
>            Priority: Major
>
> In my sql, It join three tables, and all these tables are small table (about 
> 2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
> oom exception: 
> {code:java}
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>       at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>       at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>       at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128)
>       at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:162)
>       at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>       at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:111)
>       at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>       at 
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
> {code}
> {code:java}
> 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in 
> stage 3.0 (TID 34, localhost, executor driver): 
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes 
> of memory, got 0
>       at 
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
>       at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
>       at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128)
>       at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:162)
>       at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
>       at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:111)
>       at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
>       at 
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
> {code}
> Finally I found out the problem go through studying the source code. The 
> reason of  the exception is that task can't allocate page(in my case, the 
> size per page is 32M) from MemoryManager because coalesce will run 20 parent 
> paritition in one task(spark.sql.shuffle.partitions=20), and after sorted 
> merge join for each parent partition, the UnsafeExternalRowSorter can not 
> cleanup some pages allocated. After run 14th parent partition(in my case), 
> there is no enough space in execution memory for acquiring page in sort. 
> Why UnsafeExternalRowSorter can not cleanup some pages resource after 
> finished join for parent partition?
> After my constant attempts, the problem is in SortMergeJoinScanner. 
> UnsafeExternalRowSorter cleanup resource only when it's iterator be advance 
> to end. But in SortMergeJoinScanner, when streamedIterator is end ,the 
> bufferedIterator may not end, so bufferedIterator cannot cleanup the resource 
> and vice versa.
> The solution may be :
> 1、advance to last for the iterator when another iterator has traversed to 
> last. This solution may decrease performace because of the unnecessary 
> traversing.
> 2、When one iterator has traversed to last, we invoke the sorter cleanup 
> method directly. This solution will cause large change for source code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to