[
https://issues.apache.org/jira/browse/SPARK-13450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15812508#comment-15812508
]
Tejas Patil edited comment on SPARK-13450 at 1/9/17 6:48 PM:
-------------------------------------------------------------
I have seen this problem a couple times in prod while trying out jobs over
Spark. There have been some discussions in the jira and here are my comments on
those:
- How to reproduce ? As [~shenhong] said, even in my case there were keys in
the joined relation which were skewed. I was able to grab a heap dump which
shows the array buffer grown more than a GB :
https://issues.apache.org/jira/secure/attachment/12846382/heap-dump-analysis.png
- [~hvanhovell] had some suggestions about using Cartesian Join OR co-group. In
my case, our users are running Hive SQL queries as-is over Spark. If there are
one-off such cases, we could have done that but with automated migration of
several jobs, we want the query to just work. OOMs lead to un-reliable behavior
and affects users.
- I looked at `Window.scala` but it only works for unsafe rows. In sort merge
join, we may or may not have unsafe rows.
- There was a PR associated with this jira
(https://github.com/apache/spark/pull/11386) but its inactive. I tried to pick
it up but it does not apply. Its basically copying ExternalAppendOnlyMap code
and introducing a `Buffer` version of it for lists. Instead of taking care of
merge conflicts, I was able to implement buffer version by reusing
`ExternalAppendOnlyMap` code. Will submit a fresh PR after testing it.
{code}
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:503)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:61)
at
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.bufferMatchingRows(SortMergeJoinExec.scala:756)
at
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextInnerJoinRows(SortMergeJoinExec.scala:660)
at
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1$$anon$1.advanceNext(SortMergeJoinExec.scala:137)
at
org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:186)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:355)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:103)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:94)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
was (Author: tejasp):
I have seen this problem a couple times in prod while trying out jobs over
Spark. There have been some discussions in the jira and here are my comments on
those:
- How to reproduce ? As [~shenhong] said, even in my case there were keys in
the joined relation which were skewed. I was able to grab a heap dump (see
attached image) which shows the array buffer grown more than a GB.
- [~hvanhovell] had some suggestions about using Cartesian Join OR co-group. In
my case, our users are running Hive SQL queries as-is over Spark. If there are
one-off such cases, we could have done that but with automated migration of
several jobs, we want the query to just work. OOMs lead to un-reliable behavior
and affects users.
- I looked at `Window.scala` but it only works for unsafe rows. In sort merge
join, we may or may not have unsafe rows.
- There was a PR associated with this jira
(https://github.com/apache/spark/pull/11386) but its inactive. I tried to pick
it up but it does not apply. Its basically copying ExternalAppendOnlyMap code
and introducing a `Buffer` version of it for lists. Instead of taking care of
merge conflicts, I was able to implement buffer version by reusing
`ExternalAppendOnlyMap` code. Will submit a fresh PR after testing it.
{code}
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:503)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:61)
at
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.bufferMatchingRows(SortMergeJoinExec.scala:756)
at
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextInnerJoinRows(SortMergeJoinExec.scala:660)
at
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1$$anon$1.advanceNext(SortMergeJoinExec.scala:137)
at
org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:186)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:355)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:103)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:94)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
> SortMergeJoin will OOM when join rows have lot of same keys
> -----------------------------------------------------------
>
> Key: SPARK-13450
> URL: https://issues.apache.org/jira/browse/SPARK-13450
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.0, 2.0.2, 2.1.0
> Reporter: Hong Shen
> Attachments: heap-dump-analysis.png
>
>
> When I run a sql with join, task throw java.lang.OutOfMemoryError and sql
> failed. I have set spark.executor.memory 4096m.
> SortMergeJoin use a ArrayBuffer[InternalRow] to store bufferedMatches, if
> the join rows have a lot of same key, it will throw OutOfMemoryError.
> {code}
> /** Buffered rows from the buffered side of the join. This is empty if
> there are no matches. */
> private[this] val bufferedMatches: ArrayBuffer[InternalRow] = new
> ArrayBuffer[InternalRow]
> {code}
> Here is the stackTrace:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy(Native Method)
> org.xerial.snappy.Snappy.arrayCopy(Snappy.java:84)
> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:190)
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:163)
> java.io.DataInputStream.readFully(DataInputStream.java:195)
> java.io.DataInputStream.readLong(DataInputStream.java:416)
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:71)
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$2.loadNext(UnsafeSorterSpillMerger.java:79)
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:136)
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:123)
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:84)
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoin.scala:300)
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.bufferMatchingRows(SortMergeJoin.scala:329)
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextInnerJoinRows(SortMergeJoin.scala:229)
> org.apache.spark.sql.execution.joins.SortMergeJoin$$anonfun$doExecute$1$$anon$1.advanceNext(SortMergeJoin.scala:105)
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:88)
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:741)
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:741)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:337)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:301)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:337)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:301)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> org.apache.spark.scheduler.Task.run(Task.scala:89)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:215)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:744)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]