Sun Fulin created SPARK-5711:
--------------------------------
Summary: Sort Shuffle performance issues about using AppendOnlyMap
for large data sets
Key: SPARK-5711
URL: https://issues.apache.org/jira/browse/SPARK-5711
Project: Spark
Issue Type: Bug
Affects Versions: 1.2.0
Environment: hbase-0.98.6-cdh5.2.0 phoenix-4.2.2
Reporter: Sun Fulin
Recently we had caught performance issues when using spark 1.2.0 to read data
from hbase and do some summary work.
Our scenario means to : read large data sets from hbase (maybe 100G+ file) ,
form hbaseRDD, transform to schemardd,
groupby and aggregate the data while got fewer new summary data sets, loading
data into hbase (phoenix).
Our major issue lead to : aggregate large datasets to get summary data sets
would consume too long time (1 hour +) , while that
should be supposed not so bad performance. We got the dump file attached and
stacktrace from jstack like the following:
>From the stacktrace and dump file we can identify that processing large
>datasets would cause frequent AppendOnlyMap growing, and
leading to huge map entrysize. We had referenced the source code of
org.apache.spark.util.collection.AppendOnlyMap and found that
the map had been initialized with capacity of 64. That would be too small for
our use case.
Thread 22432: (state = IN_JAVA)
- org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224
(Compiled frame; information may be imprecise)
- org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
@bci=1, line=38 (Interpreted frame)
- org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
line=198 (Compiled frame)
- org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
scala.Function2) @bci=201, line=145 (Compiled frame)
-
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
scala.Function2) @bci=3, line=32 (Compiled frame)
-
org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
@bci=141, line=205 (Compiled frame)
-
org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
@bci=74, line=58 (Interpreted frame)
-
org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
@bci=169, line=68 (Interpreted frame)
-
org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
@bci=2, line=41 (Interpreted frame)
- org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame)
- org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
(Interpreted frame)
-
java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
@bci=95, line=1145 (Interpreted frame)
- java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
(Interpreted frame)
- java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
Thread 22431: (state = IN_JAVA)
- org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224
(Compiled frame; information may be imprecise)
- org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
@bci=1, line=38 (Interpreted frame)
- org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
line=198 (Compiled frame)
- org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
scala.Function2) @bci=201, line=145 (Compiled frame)
-
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
scala.Function2) @bci=3, line=32 (Compiled frame)
-
org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
@bci=141, line=205 (Compiled frame)
-
org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
@bci=74, line=58 (Interpreted frame)
-
org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
@bci=169, line=68 (Interpreted frame)
-
org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
@bci=2, line=41 (Interpreted frame)
- org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame)
- org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
(Interpreted frame)
-
java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
@bci=95, line=1145 (Interpreted frame)
- java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
(Interpreted frame)
- java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]