Hi Imran,

Thanks for the advice, tweaking with some akka parameters helped. See below.

Now, we noticed that we get java heap OOM exceptions on the output tracker
when we have too many tasks. I wonder:
1. where does the map output tracker live? The driver? The master (when
those are not the same)?
2. how can we increase the heap for it? Especially when using spark-submit?

Thanks,
Thomas

PS: akka parameter that one might want to increase:
# akka timeouts/heartbeats settings multiplied by 10 to avoid problems
spark.akka.timeout 1000
spark.akka.heartbeat.pauses 60000
spark.akka.failure-detector.threshold 3000.0
spark.akka.heartbeat.interval 10000

# Hidden akka conf to avoid MapOutputTracker timeouts
# See
https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
spark.akka.askTimeout 300
spark.akka.lookupTimeout 300

On Fri, Mar 20, 2015 at 9:18 AM, Imran Rashid <iras...@cloudera.com> wrote:

> Hi Thomas,
>
> sorry for such a late reply.  I don't have any super-useful advice, but
> this seems like something that is important to follow up on.  to answer
> your immediate question, No, there should not be any hard limit to the
> number of tasks that MapOutputTracker can handle.  Though of course as
> things get bigger, the overheads increase which is why you might hit
> timeouts.
>
> Two other minor suggestions:
> (1) increase spark.akka.askTimeout -- thats the timeout you are running
> into, it defaults to 30 seconds
> (2) as you've noted, you've needed to play w/ other timeouts b/c of long
> GC pauses -- its possible some GC tuning might help, though its a bit of a
> black art so its hard to say what you can try.  You cold always try
> Concurrent Mark Swee to avoid the long pauses, but of course that will
> probably hurt overall performance.
>
> can you share any more details of what you are trying to do?
>
> Since you're fetching shuffle blocks in a shuffle map task, I guess you've
> got two shuffles back-to-back, eg.
> someRDD.reduceByKey{...}.map{...}.filter{...}.combineByKey{...}.  Do you
> expect to be doing a lot of GC in between the two shuffles?? -eg., in the
> little example I have, if there were lots of objects being created in the
> map & filter steps that will make it out of the eden space.  One possible
> solution to this would be to force the first shuffle to complete, before
> running any of the subsequent transformations, eg. by forcing
> materialization to the cache first
>
> val intermediateRDD = someRDD.reduceByKey{...}.persist(DISK)
> intermediateRDD.count() // force the shuffle to complete, without trying
> to do our complicated downstream logic at the same time
>
> val finalResult = intermediateRDD.map{...}.filter{...}.combineByKey{...}
>
> Also, can you share your data size?  Do you expect the shuffle to be
> skewed, or do you think it will be well-balanced?  Not that I'll have any
> suggestions for you based on the answer, but it may help us reproduce it
> and try to fix whatever the root cause is.
>
> thanks,
> Imran
>
>
>
> On Wed, Mar 4, 2015 at 12:30 PM, Thomas Gerber <thomas.ger...@radius.com>
> wrote:
>
>> I meant spark.default.parallelism of course.
>>
>> On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber <thomas.ger...@radius.com>
>> wrote:
>>
>>> Follow up:
>>> We re-retried, this time after *decreasing* spark.parallelism. It was
>>> set to 16000 before, (5 times the number of cores in our cluster). It is
>>> now down to 6400 (2 times the number of cores).
>>>
>>> And it got past the point where it failed before.
>>>
>>> Does the MapOutputTracker have a limit on the number of tasks it can
>>> track?
>>>
>>>
>>> On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber <thomas.ger...@radius.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge
>>>> workers). We use spark-submit to start an application.
>>>>
>>>> We got the following error which leads to a failed stage:
>>>>
>>>> Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, 
>>>> most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, 
>>>> ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error 
>>>> communicating with MapOutputTracker
>>>>
>>>>
>>>> We tried the whole application again, and it failed on the same stage
>>>> (but it got more tasks completed on that stage) with the same error.
>>>>
>>>> We then looked at executors stderr, and all show similar logs, on both
>>>> runs (see below). As far as we can tell, executors and master have disk
>>>> space left.
>>>>
>>>> *Any suggestion on where to look to understand why the communication
>>>> with the MapOutputTracker fails?*
>>>>
>>>> Thanks
>>>> Thomas
>>>> ====
>>>> In case it matters, our akka settings:
>>>> spark.akka.frameSize 50
>>>> spark.akka.threads 8
>>>> // those below are 10* the default, to cope with large GCs
>>>> spark.akka.timeout 1000
>>>> spark.akka.heartbeat.pauses 60000
>>>> spark.akka.failure-detector.threshold 3000.0
>>>> spark.akka.heartbeat.interval 10000
>>>>
>>>> Appendix: executor logs, where it starts going awry
>>>>
>>>> 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 
>>>> 298525
>>>> 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 
>>>> 298525)
>>>> 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with 
>>>> curMem=5543008799, maxMem=18127202549
>>>> 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as 
>>>> bytes in memory (estimated size 1473.0 B, free 11.7 GB)
>>>> 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block 
>>>> broadcast_339_piece0
>>>> 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 
>>>> took 224 ms
>>>> 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with 
>>>> curMem=5543010272, maxMem=18127202549
>>>> 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values 
>>>> in memory (estimated size 2.5 KB, free 11.7 GB)
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
>>>> actor = 
>>>> Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>>>> shuffle 18, fetching them
>>>> 15/03/04 11:45:30 ERROR MapOutputTrackerWorker: Error communicating with 
>>>> MapOutputTracker
>>>> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>>>>    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>>    at 
>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>>    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>>    at 
>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>>>    at scala.concurrent.Await$.result(package.scala:107)
>>>>    at 
>>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:112)
>>>>    at 
>>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:163)
>>>>    at 
>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>>>>    at 
>>>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>>>>    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>>    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>>    at 
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>    at 
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>    at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>>>    at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>    at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>    at java.lang.Thread.run(Thread.java:745)
>>>> 15/03/04 11:45:30 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
>>>> actor = 
>>>> Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
>>>> 15/03/04 11:45:30 ERROR Executor: Exception in task 32.0 in stage 140.0 
>>>> (TID 295474)
>>>> org.apache.spark.SparkException: Error communicating with MapOutputTracker
>>>>    at 
>>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:116)
>>>>    at 
>>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:163)
>>>>    at 
>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>>>>
>>>> ===
>>>> and then later a lot of those:
>>>> ===
>>>>
>>>> 15/03/04 11:51:50 ERROR TransportRequestHandler: Error sending result 
>>>> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=29906093434, 
>>>> chunkIndex=25}, 
>>>> buffer=FileSegmentManagedBuffer{file=/mnt/spark/spark-3f8c4cbe-a1f8-4a66-ac17-0a3d3daaffaf/spark-92cb6108-35af-4ad0-82f6-ac904b677eff/spark-8fc6043c-df95-4c48-9215-5b9907014b55/spark-99219c49-778b-4b5f-8454-24d2d3b82b81/0d/shuffle_18_6718_0.data,
>>>>  offset=182070, length=166}} to /10.0.12.24:33174; closing connection
>>>> java.nio.channels.ClosedChannelException
>>>>
>>>>
>>>
>>
>

Reply via email to