[
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16762280#comment-16762280
]
Tao Luo commented on SPARK-24657:
---------------------------------
Just reproduced it in standalone mode using
[https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz,]
Python 3.7. Same code as above.
Succeeds with 1 thread: ./bin/pyspark
Fails with >1 thread: ./bin/pyspark --master local[2]
{code:java}
SparkSession available as 'spark'.
>>> from pyspark.sql.functions import rand, col
>>>
>>> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
>>> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
>>>
>>> r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
>>> r1 = r1.withColumn('value', rand())
>>> r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
>>> r2 = r2.withColumn('value2', rand())
>>> joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
>>> joined = joined.coalesce(1)
>>> joined.show(){code}
{code:java}
[Stage 2:> (0 + 1) /
1]2019-02-06 17:12:27 WARN TaskMemoryManager:304 - Failed to allocate a page
(1900544 bytes), try again.
2019-02-06 17:12:27 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID
6)
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of
memory, got 0
at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135){code}
> SortMergeJoin may cause SparkOutOfMemory in execution memory because of not
> cleanup resource when finished the merge join
> ---------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-24657
> URL: https://issues.apache.org/jira/browse/SPARK-24657
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.2.0, 2.3.0, 2.3.1
> Reporter: Joshuawangzj
> Priority: Major
>
> In my sql, It join three tables, and all these tables are small table (about
> 2mb). And to solve the small files issue, I use coalesce(1). But it throw the
> oom exception:
> {code:java}
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes
> of memory, got 0
> at
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
> at
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
> at
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128)
> at
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:162)
> at
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
> at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:111)
> at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
> at
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
> {code}
> {code:java}
> 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in
> stage 3.0 (TID 34, localhost, executor driver):
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes
> of memory, got 0
> at
> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
> at
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
> at
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128)
> at
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:162)
> at
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
> at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:111)
> at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
> at
> org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
> {code}
> Finally I found out the problem go through studying the source code. The
> reason of the exception is that task can't allocate page(in my case, the
> size per page is 32M) from MemoryManager because coalesce will run 20 parent
> paritition in one task(spark.sql.shuffle.partitions=20), and after sorted
> merge join for each parent partition, the UnsafeExternalRowSorter can not
> cleanup some pages allocated. After run 14th parent partition(in my case),
> there is no enough space in execution memory for acquiring page in sort.
> Why UnsafeExternalRowSorter can not cleanup some pages resource after
> finished join for parent partition?
> After my constant attempts, the problem is in SortMergeJoinScanner.
> UnsafeExternalRowSorter cleanup resource only when it's iterator be advance
> to end. But in SortMergeJoinScanner, when streamedIterator is end ,the
> bufferedIterator may not end, so bufferedIterator cannot cleanup the resource
> and vice versa.
> The solution may be :
> 1、advance to last for the iterator when another iterator has traversed to
> last. This solution may decrease performace because of the unnecessary
> traversing.
> 2、When one iterator has traversed to last, we invoke the sorter cleanup
> method directly. This solution will cause large change for source code.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]