[
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14645913#comment-14645913
]
Alex Slusarenko edited comment on SPARK-1239 at 7/29/15 12:00 PM:
------------------------------------------------------------------
Hi, all. We have faced this issue many times. And I've seen about a dozen
unanswered mailing lists where guys saw this problem. Currently, we have 250
000 map tasks and the same amount of reduce tasks. We have 200 slave nodes. The
driver has 80 GB RAM. First we observed akka frame size limit exception and
after increasing the limit we see OOM. Here is the corresponding part of the
log:
{noformat}
...
15/07/27 17:22:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 262144 tasks
15/07/27 17:22:57 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID
233766, 10.47.190.240, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID
233767, 10.145.26.133, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID
233768, 10.51.191.206, PROCESS_LOCAL, 1215 bytes)
...
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3197.0 in stage 1.0 (TID
236963, 10.99.197.178, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3198.0 in stage 1.0 (TID
236964, 10.65.148.16, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3199.0 in stage 1.0 (TID
236965, 10.123.204.224, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.145.30.250:38441 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.140.170.222:35810 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.7.205.149:43761 (size: 3.8 KB, free: 4.1 GB)
...
5/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.165.146.7:37388 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.153.254.79:49517 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.95.198.154:53675 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:24:41 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 0 is 166509346 bytes
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 0 to 10.109.157.235:39740
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 0 to 10.166.156.78:59382
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 0 to 10.152.41.131:47968
...
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 0 to 10.140.253.251:44621
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 0 to 10.153.254.79:42648
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 0 to 10.169.230.246:45473
15/07/27 17:25:31 INFO BlockManagerInfo: Removed broadcast_1_piece0 on
10.146.43.5:49989 in memory (size: 3.2 KB, free: 3.4 GB)
15/07/27 17:27:25 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-47] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747)
at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:722)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
...
{noformat}
Do you need any additional info?
was (Author: alyaxey):
Hi, all. We have faced this issue many times. Currently, we have 250 000 map
tasks and the same amount of reduce tasks. We have 200 slave nodes. The driver
has 80 GB RAM. First we observed akka frame size limit exception and after
increasing the limit we see OOM. Here is the corresponding part of the log:
{noformat}
...
15/07/27 17:22:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 262144 tasks
15/07/27 17:22:57 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID
233766, 10.47.190.240, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID
233767, 10.145.26.133, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID
233768, 10.51.191.206, PROCESS_LOCAL, 1215 bytes)
...
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3197.0 in stage 1.0 (TID
236963, 10.99.197.178, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3198.0 in stage 1.0 (TID
236964, 10.65.148.16, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3199.0 in stage 1.0 (TID
236965, 10.123.204.224, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.145.30.250:38441 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.140.170.222:35810 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.7.205.149:43761 (size: 3.8 KB, free: 4.1 GB)
...
5/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.165.146.7:37388 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.153.254.79:49517 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on
10.95.198.154:53675 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:24:41 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 0 is 166509346 bytes
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 0 to 10.109.157.235:39740
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 0 to 10.166.156.78:59382
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 0 to 10.152.41.131:47968
...
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 0 to 10.140.253.251:44621
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 0 to 10.153.254.79:42648
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 0 to 10.169.230.246:45473
15/07/27 17:25:31 INFO BlockManagerInfo: Removed broadcast_1_piece0 on
10.146.43.5:49989 in memory (size: 3.2 KB, free: 3.4 GB)
15/07/27 17:27:25 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-47] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747)
at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:722)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
...
{noformat}
Do you need any additional info?
> Don't fetch all map output statuses at each reducer during shuffles
> -------------------------------------------------------------------
>
> Key: SPARK-1239
> URL: https://issues.apache.org/jira/browse/SPARK-1239
> Project: Spark
> Issue Type: Improvement
> Components: Shuffle, Spark Core
> Affects Versions: 1.0.2, 1.1.0
> Reporter: Patrick Wendell
>
> Instead we should modify the way we fetch map output statuses to take both a
> mapper and a reducer - or we should just piggyback the statuses on each task.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]