[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212522#comment-16212522
 ] 

Bartosz Mścichowski commented on SPARK-21492:
---------------------------------------------

Here's a script that exposes memory leak during SortMergeJoin in Spark 2.2.0, 
maybe it will be helpful.
Memory leak happens when the following code is executed in spark-shell (a local 
one). {{--conf spark.sql.autoBroadcastJoinThreshold=-1}} may be needed to 
ensure proper join type.

{noformat}
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val table1Key = "t1_key"
val table1Value = "t1_value"

val table2Key = "t2_key"
val table2Value = "t2_value"

val table1Schema = StructType(List(
    StructField(table1Key, IntegerType),
    StructField(table1Value, DoubleType)
));

val table2Schema = StructType(List(
    StructField(table2Key, IntegerType),
    StructField(table2Value, DoubleType)
));

val table1 = spark.sqlContext.createDataFrame(
    rowRDD = spark.sparkContext.parallelize(Seq(
        Row(1, 2.0)
    )),
    schema = table1Schema
);

val table2 = spark.sqlContext.createDataFrame(
    rowRDD = spark.sparkContext.parallelize(Seq(
        Row(1, 4.0)
    )),
    schema = table2Schema
);


val t1 = table1.repartition(col(table1Key)).groupBy(table1Key).avg()
val t2 = table2.repartition(col(table2Key)).groupBy(table2Key).avg()

val joinedDF = t1 join t2 where t1(table1Key) === t2(table2Key)

joinedDF.explain()
// == Physical Plan ==
// *SortMergeJoin [t1_key#2], [t2_key#9], Inner
// :- *Sort [t1_key#2 ASC NULLS FIRST], false, 0
// :  +- *HashAggregate(keys=[t1_key#2], functions=[avg(cast(t1_key#2 as 
bigint)), avg(t1_value#3)])
// :     +- *HashAggregate(keys=[t1_key#2], 
functions=[partial_avg(cast(t1_key#2 as bigint)), partial_avg(t1_value#3)])
// :        +- Exchange hashpartitioning(t1_key#2, 200)
// :           +- *Filter isnotnull(t1_key#2)
// :              +- Scan ExistingRDD[t1_key#2,t1_value#3]
// +- *Sort [t2_key#9 ASC NULLS FIRST], false, 0
//    +- *HashAggregate(keys=[t2_key#9], functions=[avg(cast(t2_key#9 as 
bigint)), avg(t2_value#10)])
//       +- *HashAggregate(keys=[t2_key#9], 
functions=[partial_avg(cast(t2_key#9 as bigint)), partial_avg(t2_value#10)])
//          +- Exchange hashpartitioning(t2_key#9, 200)
//             +- *Filter isnotnull(t2_key#9)
//                +- Scan ExistingRDD[t2_key#9,t2_value#10]

joinedDF.show()
// The 'show' action yields a lot of:
// 17/10/19 08:17:39 WARN executor.Executor: Managed memory leak detected; size 
= 4194304 bytes, TID = 8
// 17/10/19 08:17:39 WARN executor.Executor: Managed memory leak detected; size 
= 4194304 bytes, TID = 9
// 17/10/19 08:17:39 WARN executor.Executor: Managed memory leak detected; size 
= 4194304 bytes, TID = 11
{noformat}


> Memory leak in SortMergeJoin
> ----------------------------
>
>                 Key: SPARK-21492
>                 URL: https://issues.apache.org/jira/browse/SPARK-21492
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Zhan Zhang
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to