On re running the cache statement, from the logs I see that when
collect(stage 1) fails it always leads to mapPartition(stage 0) for one
partition to be re-run. This can be seen from the collect log as well on
the container log:

rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0

The data is lzo compressed sequence file with compressed size ~ 26G. Is
there a way to understand why shuffle keeps failing for one partition. I
believe we have enough memory to store the uncompressed data in memory.

On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood <sadhan.s...@gmail.com> wrote:

> This is the log output:
>
> 2014-11-12 19:07:16,561 INFO  thriftserver.SparkExecuteStatementOperation
> (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
> SELECT * FROM xyz where date_prefix = 20141112'
>
> 2014-11-12 19:07:17,455 INFO  Configuration.deprecation
> (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
> deprecated. Instead, use mapreduce.job.maps
>
> 2014-11-12 19:07:17,756 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
> TableReader.scala:68
>
> 2014-11-12 19:07:18,292 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84
>
> 2014-11-12 19:07:22,801 INFO  mapred.FileInputFormat
> (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200
>
> 2014-11-12 19:07:22,835 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
> Exchange.scala:86)
>
> 2014-11-12 19:07:22,837 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
> with 1 output partitions (allowLocal=false)
>
> 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
> SparkPlan.scala:84)
>
> 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)
>
> 2014-11-12 19:07:22,842 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)
>
> 2014-11-12 19:07:22,871 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
> mapPartitions at Exchange.scala:86), which has no missing parents
>
> 2014-11-12 19:07:22,916 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:07:22,963 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
> (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:10:04,088 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
> finished in 161.113 s
>
> 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - looking for newly runnable stages
>
> 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - running: Set()
>
> 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
>
> 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - failed: Set()
>
> 2014-11-12 19:10:04,094 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
>
> 2014-11-12 19:10:04,097 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
> SparkPlan.scala:84), which is now runnable
>
> 2014-11-12 19:10:04,112 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:10:04,115 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
> (MappedRDD[16] at map at SparkPlan.scala:84)
>
> 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
> (Logging.scala:logError(75)) - Lost executor 52 on
> ip-10-61-175-167.ec2.internal: remote Akka client disassociated
>
> 2014-11-12 19:10:08,543 WARN  remote.ReliableDeliverySupervisor
> (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
> [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has
> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
> (Logging.scala:logError(75)) - Asked to remove non-existent executor 52
>
> 2014-11-12 19:10:08,550 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)
>
> 2014-11-12 19:10:08,555 INFO  scheduler.Stage (Logging.scala:logInfo(59))
> - Stage 0 is now unavailable on executor 52 (460/461, false)
>
> 2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
> SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
> (mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 1 (collect at SparkPlan.scala:84)
> failed in 4.571 s
>
> 2014-11-12 19:10:08,687 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Resubmitting Stage 0 (mapPartitions at
> Exchange.scala:86) and Stage 1 (collect at SparkPlan.scala:84) due to fetch
> failure
>
> 2014-11-12 19:10:08,908 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Resubmitting failed stages
>
> 2014-11-12 19:10:08,974 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
> mapPartitions at Exchange.scala:86), which has no missing parents
>
> 2014-11-12 19:10:08,989 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 3 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:10:08,990 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0
> (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
> finished in 66.475 s
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - looking for newly runnable stages
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - running: Set()
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - failed: Set()
>
> 2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
>
> 2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
> SparkPlan.scala:84), which is now runnable
>
> 2014-11-12 19:11:15,482 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:11:15,482 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
> (MappedRDD[16] at map at SparkPlan.scala:84)
>
> 2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler
> (Logging.scala:logError(75)) - Lost executor 372 on
> ip-10-95-163-84.ec2.internal: remote Akka client disassociated
>
> 2014-11-12 19:11:21,655 WARN  remote.ReliableDeliverySupervisor
> (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
> [akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed,
> address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend
> (Logging.scala:logError(75)) - Asked to remove non-existent executor 372
>
> 2014-11-12 19:11:21,655 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3)
>
>
>
>
> On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood <sadhan.s...@gmail.com>
> wrote:
>
>> We are running spark on yarn with combined memory > 1TB and when trying
>> to cache a table partition(which is < 100G), seeing a lot of failed collect
>> stages in the UI and this never succeeds. Because of the failed collect, it
>> seems like the mapPartitions keep getting resubmitted. We have more than
>> enough memory so its surprising we are seeing this issue. Can someone
>> please help. Thanks!
>>
>> The stack trace of the failed collect from UI is:
>>
>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
>> location for shuffle 0
>>      at 
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
>>      at 
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
>>      at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>      at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>      at 
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>      at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>      at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>      at 
>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
>>      at 
>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
>>      at 
>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>>      at 
>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>>      at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>      at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>      at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>      at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>      at java.lang.Thread.run(Thread.java:745)
>>
>>
>

Reply via email to