[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16763820#comment-16763820 ]
Tao Luo commented on SPARK-21492: --------------------------------- If SortMergeJoinScanner doesn't consume the iterator from UnsafeExternalRowSorter entirely, the memory that UnsafeExternalSorter acquired from TaskMemoryManager will never be released. This leads to a memory leak, spills, and OOME. A page will be held per partition of the unused iterator. {code:java} from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} {{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], [timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}} {{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}} Using broadcast succeeds: {code:java} #broadcast joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code} Running on Spark 2.4. Or if you prefer Scala: {code:java} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{col, rand} spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn("value", rand()) var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn("value2", rand()) var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner") joined = joined.coalesce(1) joined.explain() joined.show(){code} Just reproduced it in standalone mode using [https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz,] Python 3.7. Same code as above. Succeeds with 1 thread: ./bin/pyspark --master local[1] Fails with >1 thread: ./bin/pyspark --master local[4] {code:java} SparkSession available as 'spark'. >>> from pyspark.sql.functions import rand, col >>> >>> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") >>> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) >>> >>> r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) >>> r1 = r1.withColumn('value', rand()) >>> r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2")) >>> r2 = r2.withColumn('value2', rand()) >>> joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") >>> joined = joined.coalesce(1) >>> joined.show(){code} {code:java} [Stage 2:> (0 + 1) / 1]2019-02-06 17:12:27 WARN TaskMemoryManager:304 - Failed to allocate a page (1900544 bytes), try again. 2019-02-06 17:12:27 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 6) org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157) at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135){code} > Memory leak in SortMergeJoin > ---------------------------- > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0 > Reporter: Zhan Zhang > Priority: Major > > In SortMergeJoin, if the iterator is not exhausted, there will be memory leak > caused by the Sort. The memory is not released until the task end, and cannot > be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org