[ 
https://issues.apache.org/jira/browse/SPARK-3333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14117062#comment-14117062
 ] 

Josh Rosen commented on SPARK-3333:
-----------------------------------

[~shivaram] and I discussed this; we have a few ideas about what might be 
happening.

I tried running {{sc.parallelize(1 to 
10).repartition(24000).keyBy(x=>x).reduceByKey(_+_).collect()}} in 1.0.2 and 
observed similarly slow speed to what I saw in the current 1.1.0 release 
candidate.  When I modified my job to use fewer reducers ({{reduceByKey(_+_, 
4)}}, then the job completed quickly.  You can see similar behavior in Python 
by explicitly specifying a smaller number of reducers.

I think the issue here is that the overhead of sending and processing task 
completions is proportional to O(numReducers).  Specifically, the uncompressed 
size of ShuffleMapTask results is roughly O(numReducers), and there's a 
O(numReducers) processing cost for task completions within DAGScheduler (since 
mapOutputLocations is O(numReducers)).

This normally isn't a problem, but it can impact performance for jobs with 
large numbers of extremely small map tasks (like this job, where nearly all of 
the map tasks are effectively no-ops).  For larger tasks, this cost should be 
masked by larger overheads (such as task processing time).

I'm not sure where the OOM is coming from, but the slow performance that you're 
observing here is probably due to the new default number of reducers 
(https://github.com/apache/spark/pull/1138 exposed this in Python by changing 
it's defaults to match Scala Spark).

As a result, I'm not sure that this is a regression from 1.0.2, since it 
behaves similarly for Scala jobs.  I think we already do some compression of 
the task results and there are probably other improvements that we can make to 
lower these overheads, but I think we should postpone that to 1.2.0.

> Large number of partitions causes OOM
> -------------------------------------
>
>                 Key: SPARK-3333
>                 URL: https://issues.apache.org/jira/browse/SPARK-3333
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.1.0
>         Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
>            Reporter: Nicholas Chammas
>
> Here’s a repro for PySpark:
> {code}
> a = sc.parallelize(["Nick", "John", "Bob"])
> a = a.repartition(24000)
> a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
> {code}
> This code runs fine on 1.0.2. It returns the following result in just over a 
> minute:
> {code}
> [(4, 'NickJohn')]
> {code}
> However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
> runs for a very, very long time (at least > 45 min) and then fails with 
> {{java.lang.OutOfMemoryError: Java heap space}}.
> Here is a stack trace taken from a run on 1.1.0-rc2:
> {code}
> >>> a = sc.parallelize(["Nick", "John", "Bob"])
> >>> a = a.repartition(24000)
> >>> a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
> 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
> heart beats: 175143ms exceeds 45000ms
> 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
> heart beats: 175359ms exceeds 45000ms
> 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
> heart beats: 173061ms exceeds 45000ms
> 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
> heart beats: 176816ms exceeds 45000ms
> 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
> heart beats: 182241ms exceeds 45000ms
> 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
> heart beats: 178406ms exceeds 45000ms
> 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
> thread-3
> java.lang.OutOfMemoryError: Java heap space
>     at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
>     at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
>     at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
>     at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>     at 
> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
>     at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>     at 
> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
>     at 
> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
>     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> Exception in thread "Result resolver thread-3" 14/08/29 21:56:26 ERROR 
> SendingConnection: Exception while reading SendingConnection to 
> ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
> java.nio.channels.ClosedChannelException
>     at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
>     at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
>     at 
> org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> java.lang.OutOfMemoryError: Java heap space
>     at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
>     at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
>     at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
>     at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>     at 
> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
>     at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>     at 
> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
>     at 
> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
>     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> 14/08/29 21:54:43 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(6, ip-10-137-1-139.ec2.internal, 42539, 0) with no recent 
> heart beats: 183978ms exceeds 45000ms
> 14/08/29 21:57:42 ERROR ConnectionManager: Corresponding SendingConnection to 
> ConnectionManagerId(ip-10-138-9-33.ec2.internal,41924) not found
> 14/08/29 21:57:51 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(11, ip-10-236-181-116.ec2.internal, 46847, 0) with no recent 
> heart beats: 178629ms exceeds 45000ms
> 14/08/29 21:57:43 ERROR ConnectionManager: Corresponding SendingConnection to 
> ConnectionManagerId(ip-10-137-1-139.ec2.internal,42539) not found
> 14/08/29 21:57:54 ERROR SendingConnection: Exception while reading 
> SendingConnection to ConnectionManagerId(ip-10-141-136-168.ec2.internal,42960)
> java.nio.channels.ClosedChannelException
>     at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
>     at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
>     at 
> org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to