[
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212522#comment-16212522
]
Bartosz Mścichowski commented on SPARK-21492:
---------------------------------------------
Here's a script that exposes memory leak during SortMergeJoin in Spark 2.2.0,
maybe it will be helpful.
Memory leak happens when the following code is executed in spark-shell (a local
one). {{--conf spark.sql.autoBroadcastJoinThreshold=-1}} may be needed to
ensure proper join type.
{noformat}
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val table1Key = "t1_key"
val table1Value = "t1_value"
val table2Key = "t2_key"
val table2Value = "t2_value"
val table1Schema = StructType(List(
StructField(table1Key, IntegerType),
StructField(table1Value, DoubleType)
));
val table2Schema = StructType(List(
StructField(table2Key, IntegerType),
StructField(table2Value, DoubleType)
));
val table1 = spark.sqlContext.createDataFrame(
rowRDD = spark.sparkContext.parallelize(Seq(
Row(1, 2.0)
)),
schema = table1Schema
);
val table2 = spark.sqlContext.createDataFrame(
rowRDD = spark.sparkContext.parallelize(Seq(
Row(1, 4.0)
)),
schema = table2Schema
);
val t1 = table1.repartition(col(table1Key)).groupBy(table1Key).avg()
val t2 = table2.repartition(col(table2Key)).groupBy(table2Key).avg()
val joinedDF = t1 join t2 where t1(table1Key) === t2(table2Key)
joinedDF.explain()
// == Physical Plan ==
// *SortMergeJoin [t1_key#2], [t2_key#9], Inner
// :- *Sort [t1_key#2 ASC NULLS FIRST], false, 0
// : +- *HashAggregate(keys=[t1_key#2], functions=[avg(cast(t1_key#2 as
bigint)), avg(t1_value#3)])
// : +- *HashAggregate(keys=[t1_key#2],
functions=[partial_avg(cast(t1_key#2 as bigint)), partial_avg(t1_value#3)])
// : +- Exchange hashpartitioning(t1_key#2, 200)
// : +- *Filter isnotnull(t1_key#2)
// : +- Scan ExistingRDD[t1_key#2,t1_value#3]
// +- *Sort [t2_key#9 ASC NULLS FIRST], false, 0
// +- *HashAggregate(keys=[t2_key#9], functions=[avg(cast(t2_key#9 as
bigint)), avg(t2_value#10)])
// +- *HashAggregate(keys=[t2_key#9],
functions=[partial_avg(cast(t2_key#9 as bigint)), partial_avg(t2_value#10)])
// +- Exchange hashpartitioning(t2_key#9, 200)
// +- *Filter isnotnull(t2_key#9)
// +- Scan ExistingRDD[t2_key#9,t2_value#10]
joinedDF.show()
// The 'show' action yields a lot of:
// 17/10/19 08:17:39 WARN executor.Executor: Managed memory leak detected; size
= 4194304 bytes, TID = 8
// 17/10/19 08:17:39 WARN executor.Executor: Managed memory leak detected; size
= 4194304 bytes, TID = 9
// 17/10/19 08:17:39 WARN executor.Executor: Managed memory leak detected; size
= 4194304 bytes, TID = 11
{noformat}
> Memory leak in SortMergeJoin
> ----------------------------
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Zhan Zhang
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak
> caused by the Sort. The memory is not released until the task end, and cannot
> be used by other operators causing performance drop or OOM.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]