[ 
https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353651#comment-16353651
 ] 

Eyal Farago commented on SPARK-19870:
-------------------------------------

we've also seen something very similar (stack traces attach) with spark 2.1.0, 
in our case this happened after several OOM conditions and executors being 
killed and restarted (new AWS account was limited on the chosen instance type 
we chose).

I've stared at these stack traces a lot and 'walked alone' the block manager 
(and block info manager) code paths, my conclusion was that a thread leaks a 
writer lock for at least one block, further investigation shows that tasks are 
registered for cleanup which should eliminate the potential problem (block info 
manager maintains maps of readers/writers per block).

However, there are other threads living inside a spark executor, such as the 
BlockTransferService that are not tasks hence have no ID and guaranteed 
cleanup...

I think I found a potential weak spot in the way spark evicts blocks from 
memory (org.apache.spark.storage.memory.MemoryStore#evictBlocksToFreeSpace) 
that has the potential of keep holding locks in case of failure. SPARK-22083 
seems to fix this so I'd recommend trying out one of the versions including the 
fix (we're currently checking the possibility to do so with the distros we're 
using).

 

[~irashid], you fixed SPARK–22083, do the scenarios referred in this issue 
resembles what you were fixing? does my analysis make sense to you?

 

attaching my stack traces:
{noformat}
Thread ID       Thread Name     Thread State    Thread Locks

148     Executor task launch worker for task 75639      WAITING 
Lock(java.util.concurrent.ThreadPoolExecutor$Worker@155076597})
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:502)
org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202)
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:450)
org.apache.spark.storage.BlockManager.get(BlockManager.scala:636)
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:693)
org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
scala.collection.immutable.List.foreach(List.scala:381)
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
org.apache.spark.scheduler.Task.run(Task.scala:99)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

168     Executor task launch worker for task 75642      WAITING 
Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1540715596})
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:502)
org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202)
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:450)
org.apache.spark.storage.BlockManager.get(BlockManager.scala:636)
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:693)
org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
scala.collection.immutable.List.foreach(List.scala:381)
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
org.apache.spark.scheduler.Task.run(Task.scala:99)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

175     block-manager-slave-async-thread-pool-35        WAITING 
Lock(java.util.concurrent.ThreadPoolExecutor$Worker@320586055})
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:502)
org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:236)
org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1370)
org.apache.spark.storage.BlockManager$$anonfun$removeBroadcast$2.apply(BlockManager.scala:1361)
org.apache.spark.storage.BlockManager$$anonfun$removeBroadcast$2.apply(BlockManager.scala:1361)
scala.collection.Iterator$class.foreach(Iterator.scala:893)
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1361)
org.apache.spark.storage.BlockManagerSlaveEndpoint$$anonfun$receiveAndReply$1$$anonfun$applyOrElse$4.apply$mcI$sp(BlockManagerSlaveEndpoint.scala:66)
org.apache.spark.storage.BlockManagerSlaveEndpoint$$anonfun$receiveAndReply$1$$anonfun$applyOrElse$4.apply(BlockManagerSlaveEndpoint.scala:66)
org.apache.spark.storage.BlockManagerSlaveEndpoint$$anonfun$receiveAndReply$1$$anonfun$applyOrElse$4.apply(BlockManagerSlaveEndpoint.scala:66)
org.apache.spark.storage.BlockManagerSlaveEndpoint$$anonfun$1.apply(BlockManagerSlaveEndpoint.scala:82)
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
{noformat}

> Repeatable deadlock on BlockInfoManager and TorrentBroadcast
> ------------------------------------------------------------
>
>                 Key: SPARK-19870
>                 URL: https://issues.apache.org/jira/browse/SPARK-19870
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, Shuffle
>    Affects Versions: 2.0.2, 2.1.0
>         Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, 
> yarn coarse-grained.
>            Reporter: Steven Ruppert
>            Priority: Major
>         Attachments: stack.txt
>
>
> Running what I believe to be a fairly vanilla spark job, using the RDD api, 
> with several shuffles, a cached RDD, and finally a conversion to DataFrame to 
> save to parquet. I get a repeatable deadlock at the very last reducers of one 
> of the stages.
> Roughly:
> {noformat}
> "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 
> tid=0x00007fffd88d3000 nid=0x1022b9 waiting for monitor entry 
> [0x00007fffb95f3000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207)
>         - waiting to lock <0x00000005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
>         at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
>         at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
>         - locked <0x00000005b12f2290> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
>         at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
>         at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
>         at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>         at org.apache.spark.scheduler.Task.run(Task.scala:99)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}
> and 
> {noformat}
> "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 
> tid=0x00007fffd88d0000 nid=0x1022b8 in Object.wait() [0x00007fffb96f4000]
>    java.lang.Thread.State: WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         at java.lang.Object.wait(Object.java:502)
>         at 
> org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202)
>         - locked <0x0000000545736b58> (a 
> org.apache.spark.storage.BlockInfoManager)
>         at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444)
>         at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
>         - locked <0x00000005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
>         at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
>         at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
>         - locked <0x000000059711eb10> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
>         at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
>         at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
>         at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>         at org.apache.spark.scheduler.Task.run(Task.scala:99)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}
> A full stack trace is attached, but those seem to be the offending threads.
> This happens across several different executors, and has persisted through 
> several runs of the same job across spark 2.1.0 and 2.0.2. I also tried 
> killing individual executors to "unstick" the job, to no avail.
> I haven't yet narrowed down the job itself to something publicly repeatable, 
> but hopefully the stacktraces are enough to start debugging.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to