[ https://issues.apache.org/jira/browse/SPARK-3333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14117062#comment-14117062 ]
Josh Rosen commented on SPARK-3333: ----------------------------------- [~shivaram] and I discussed this; we have a few ideas about what might be happening. I tried running {{sc.parallelize(1 to 10).repartition(24000).keyBy(x=>x).reduceByKey(_+_).collect()}} in 1.0.2 and observed similarly slow speed to what I saw in the current 1.1.0 release candidate. When I modified my job to use fewer reducers ({{reduceByKey(_+_, 4)}}, then the job completed quickly. You can see similar behavior in Python by explicitly specifying a smaller number of reducers. I think the issue here is that the overhead of sending and processing task completions is proportional to O(numReducers). Specifically, the uncompressed size of ShuffleMapTask results is roughly O(numReducers), and there's a O(numReducers) processing cost for task completions within DAGScheduler (since mapOutputLocations is O(numReducers)). This normally isn't a problem, but it can impact performance for jobs with large numbers of extremely small map tasks (like this job, where nearly all of the map tasks are effectively no-ops). For larger tasks, this cost should be masked by larger overheads (such as task processing time). I'm not sure where the OOM is coming from, but the slow performance that you're observing here is probably due to the new default number of reducers (https://github.com/apache/spark/pull/1138 exposed this in Python by changing it's defaults to match Scala Spark). As a result, I'm not sure that this is a regression from 1.0.2, since it behaves similarly for Scala jobs. I think we already do some compression of the task results and there are probably other improvements that we can make to lower these overheads, but I think we should postpone that to 1.2.0. > Large number of partitions causes OOM > ------------------------------------- > > Key: SPARK-3333 > URL: https://issues.apache.org/jira/browse/SPARK-3333 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 1.1.0 > Environment: EC2; 1 master; 1 slave; {{m3.xlarge}} instances > Reporter: Nicholas Chammas > > Here’s a repro for PySpark: > {code} > a = sc.parallelize(["Nick", "John", "Bob"]) > a = a.repartition(24000) > a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1) > {code} > This code runs fine on 1.0.2. It returns the following result in just over a > minute: > {code} > [(4, 'NickJohn')] > {code} > However, when I try this with 1.1.0-rc3 on an identically-sized cluster, it > runs for a very, very long time (at least > 45 min) and then fails with > {{java.lang.OutOfMemoryError: Java heap space}}. > Here is a stack trace taken from a run on 1.1.0-rc2: > {code} > >>> a = sc.parallelize(["Nick", "John", "Bob"]) > >>> a = a.repartition(24000) > >>> a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1) > 14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent > heart beats: 175143ms exceeds 45000ms > 14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent > heart beats: 175359ms exceeds 45000ms > 14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent > heart beats: 173061ms exceeds 45000ms > 14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent > heart beats: 176816ms exceeds 45000ms > 14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent > heart beats: 182241ms exceeds 45000ms > 14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent > heart beats: 178406ms exceeds 45000ms > 14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver > thread-3 > java.lang.OutOfMemoryError: Java heap space > at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > at > org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162) > at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) > at > org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514) > at > org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Exception in thread "Result resolver thread-3" 14/08/29 21:56:26 ERROR > SendingConnection: Exception while reading SendingConnection to > ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014) > java.nio.channels.ClosedChannelException > at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) > at org.apache.spark.network.SendingConnection.read(Connection.scala:390) > at > org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > java.lang.OutOfMemoryError: Java heap space > at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > at > org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162) > at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) > at > org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514) > at > org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > 14/08/29 21:54:43 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(6, ip-10-137-1-139.ec2.internal, 42539, 0) with no recent > heart beats: 183978ms exceeds 45000ms > 14/08/29 21:57:42 ERROR ConnectionManager: Corresponding SendingConnection to > ConnectionManagerId(ip-10-138-9-33.ec2.internal,41924) not found > 14/08/29 21:57:51 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(11, ip-10-236-181-116.ec2.internal, 46847, 0) with no recent > heart beats: 178629ms exceeds 45000ms > 14/08/29 21:57:43 ERROR ConnectionManager: Corresponding SendingConnection to > ConnectionManagerId(ip-10-137-1-139.ec2.internal,42539) not found > 14/08/29 21:57:54 ERROR SendingConnection: Exception while reading > SendingConnection to ConnectionManagerId(ip-10-141-136-168.ec2.internal,42960) > java.nio.channels.ClosedChannelException > at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) > at org.apache.spark.network.SendingConnection.read(Connection.scala:390) > at > org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org