[ 
https://issues.apache.org/jira/browse/SPARK-3333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14117770#comment-14117770
 ] 

Patrick Wendell commented on SPARK-3333:
----------------------------------------

[~nchammas]. I think the default number of reducers could be the culprit. If 
you look at the actual job run in Spark 1.0.2, how many reducers are there in 
the shuffle? How many are there? What happens, if you run the code in Spark 
1.1.0 and specify that number of reducers that matches the number chose in 
1.0.2... then is the performance the same?

If this is the case we should probably document this clearly in the release 
notes, since changing this default could have major implications for those 
relying on the default behavior.

> Large number of partitions causes OOM
> -------------------------------------
>
>                 Key: SPARK-3333
>                 URL: https://issues.apache.org/jira/browse/SPARK-3333
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.1.0
>         Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances
>            Reporter: Nicholas Chammas
>
> Here’s a repro for PySpark:
> {code}
> a = sc.parallelize(["Nick", "John", "Bob"])
> a = a.repartition(24000)
> a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
> {code}
> This code runs fine on 1.0.2. It returns the following result in just over a 
> minute:
> {code}
> [(4, 'NickJohn')]
> {code}
> However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it 
> runs for a very, very long time (at least > 45 min) and then fails with 
> {{java.lang.OutOfMemoryError: Java heap space}}.
> Here is a stack trace taken from a run on 1.1.0-rc2:
> {code}
> >>> a = sc.parallelize(["Nick", "John", "Bob"])
> >>> a = a.repartition(24000)
> >>> a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)
> 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
> heart beats: 175143ms exceeds 45000ms
> 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent 
> heart beats: 175359ms exceeds 45000ms
> 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent 
> heart beats: 173061ms exceeds 45000ms
> 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
> heart beats: 176816ms exceeds 45000ms
> 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent 
> heart beats: 182241ms exceeds 45000ms
> 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent 
> heart beats: 178406ms exceeds 45000ms
> 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver 
> thread-3
> java.lang.OutOfMemoryError: Java heap space
>     at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
>     at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
>     at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
>     at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>     at 
> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
>     at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>     at 
> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
>     at 
> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
>     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> Exception in thread "Result resolver thread-3" 14/08/29 21:56:26 ERROR 
> SendingConnection: Exception while reading SendingConnection to 
> ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
> java.nio.channels.ClosedChannelException
>     at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
>     at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
>     at 
> org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> java.lang.OutOfMemoryError: Java heap space
>     at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
>     at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
>     at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
>     at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>     at 
> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
>     at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>     at 
> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
>     at 
> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
>     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
>     at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> 14/08/29 21:54:43 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(6, ip-10-137-1-139.ec2.internal, 42539, 0) with no recent 
> heart beats: 183978ms exceeds 45000ms
> 14/08/29 21:57:42 ERROR ConnectionManager: Corresponding SendingConnection to 
> ConnectionManagerId(ip-10-138-9-33.ec2.internal,41924) not found
> 14/08/29 21:57:51 WARN BlockManagerMasterActor: Removing BlockManager 
> BlockManagerId(11, ip-10-236-181-116.ec2.internal, 46847, 0) with no recent 
> heart beats: 178629ms exceeds 45000ms
> 14/08/29 21:57:43 ERROR ConnectionManager: Corresponding SendingConnection to 
> ConnectionManagerId(ip-10-137-1-139.ec2.internal,42539) not found
> 14/08/29 21:57:54 ERROR SendingConnection: Exception while reading 
> SendingConnection to ConnectionManagerId(ip-10-141-136-168.ec2.internal,42960)
> java.nio.channels.ClosedChannelException
>     at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
>     at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
>     at 
> org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to