[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14645913#comment-14645913
 ] 

Alex Slusarenko commented on SPARK-1239:
----------------------------------------

Hi, all. We have faced this issue many times. Currently, we have 250 000 map 
tasks and the same amount of reduce tasks. We have 200 slave nodes. The driver 
has 80 G RAM. First we observed akka frame size limit exception and after 
increasing the limit we see OOM. Here is the corresponding part of the log:
{noformat}
...
15/07/27 17:22:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 262144 tasks
15/07/27 17:22:57 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 
233766, 10.47.190.240, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 
233767, 10.145.26.133, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 
233768, 10.51.191.206, PROCESS_LOCAL, 1215 bytes)
...
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3197.0 in stage 1.0 (TID 
236963, 10.99.197.178, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3198.0 in stage 1.0 (TID 
236964, 10.65.148.16, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3199.0 in stage 1.0 (TID 
236965, 10.123.204.224, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.145.30.250:38441 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.140.170.222:35810 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.7.205.149:43761 (size: 3.8 KB, free: 4.1 GB)
...
5/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.165.146.7:37388 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.153.254.79:49517 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.95.198.154:53675 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:24:41 INFO MapOutputTrackerMaster: Size of output statuses for 
shuffle 0 is 166509346 bytes
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.109.157.235:39740
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.166.156.78:59382
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.152.41.131:47968
...
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.140.253.251:44621
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.153.254.79:42648
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.169.230.246:45473
15/07/27 17:25:31 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 
10.146.43.5:49989 in memory (size: 3.2 KB, free: 3.4 GB)
15/07/27 17:27:25 ERROR ActorSystemImpl: Uncaught fatal error from thread 
[sparkDriver-akka.remote.default-remote-dispatcher-47] shutting down 
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
        at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
        at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
        at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
        at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
        at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
        at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747)
        at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:722)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
...
{noformat}

Do you need any additional info?

> Don't fetch all map output statuses at each reducer during shuffles
> -------------------------------------------------------------------
>
>                 Key: SPARK-1239
>                 URL: https://issues.apache.org/jira/browse/SPARK-1239
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>    Affects Versions: 1.0.2, 1.1.0
>            Reporter: Patrick Wendell
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to