That definitely sound more reliable. Worth trying out if there is a reliable way of reproducing the deadlock-like scenario.
TD On Thu, Feb 6, 2014 at 11:38 PM, Matei Zaharia <matei.zaha...@gmail.com>wrote: > I don't think we necessarily want to do this through the DAGScheduler > because the worker might also shut down due to some unusual termination > condition, like the driver node crashing. Can't we do it at the top of the > shutdown hook instead? If all the threads are in the same thread pool it > might be possible to interrupt or stop the whole pool. > > Matei > > On Feb 6, 2014, at 11:30 PM, Andrew Ash <and...@andrewash.com> wrote: > > > That's genius. Of course when a worker is told to shutdown it should > > interrupt its worker threads -- I think that would address this issue. > > > > Are you thinking to put > > > > running.map(_.jobId).foreach { handleJobCancellation } > > > > at the top of the StopDAGScheduler block? > > > > > > On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das > > <tathagata.das1...@gmail.com>wrote: > > > >> Its highly likely that the executor with the threadpool that runs the > tasks > >> are the only set of threads that writes to disk. The tasks are designed > to > >> be interrupted when the corresponding job is cancelled. So a reasonably > >> simple way could be to actually cancel the currently active jobs, which > >> would send the signal to the worker to stop the tasks. Currently, the > >> DAGScheduler< > >> > https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610 > >>> does > >> not seem to actually cancel the jobs, only mark them as failed. So it > >> may be a simple addition. > >> > >> There may be some complications with the external spilling of shuffle > data > >> to disk not stopping immediately when the task is marked for killing. > Gotta > >> try it out. > >> > >> TD > >> > >> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <and...@andrewash.com> > wrote: > >> > >>> There is probably just one threadpool that has task threads -- is it > >>> possible to enumerate and interrupt just those? We may need to keep > >> string > >>> a reference to that threadpool through to the shutdown thread to make > >> that > >>> happen. > >>> > >>> > >>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <mri...@gmail.com > >>>> wrote: > >>> > >>>> Ideally, interrupting the thread writing to disk should be sufficient > >>>> - though since we are in middle of shutdown when this is happening, it > >>>> is best case effort anyway. > >>>> Identifying which threads to interrupt will be interesting since most > >>>> of them are driven by threadpool's and we cant list all threads and > >>>> interrupt all of them ! > >>>> > >>>> > >>>> Regards, > >>>> Mridul > >>>> > >>>> > >>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <and...@andrewash.com> > >> wrote: > >>>>> I think the solution where we stop the writing threads and then let > >> the > >>>>> deleting threads completely clean up is the best option since the > >> final > >>>>> state doesn't have half-deleted temp dirs scattered across the > >> cluster. > >>>>> > >>>>> How feasible do you think it'd be to interrupt the other threads? > >>>>> > >>>>> > >>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan < > >> mri...@gmail.com > >>>>> wrote: > >>>>> > >>>>>> Looks like a pathological corner case here - where the the delete > >>>>>> thread is not getting run while the OS is busy prioritizing the > >> thread > >>>>>> writing data (probably with heavy gc too). > >>>>>> Ideally, the delete thread would list files, remove them and then > >> fail > >>>>>> when it tries to remove the non empty directory (since other thread > >>>>>> might be creating more in parallel). > >>>>>> > >>>>>> > >>>>>> Regards, > >>>>>> Mridul > >>>>>> > >>>>>> > >>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <and...@andrewash.com> > >>>> wrote: > >>>>>>> Got a repro locally on my MBP (the other was on a CentOS machine). > >>>>>>> > >>>>>>> Build spark, run a master and a worker with the sbin/start-all.sh > >>>> script, > >>>>>>> then run this in a shell: > >>>>>>> > >>>>>>> import org.apache.spark.storage.StorageLevel._ > >>>>>>> val s = sc.parallelize(1 to > >>> 1000000000).persist(MEMORY_AND_DISK_SER); > >>>>>>> s.count > >>>>>>> > >>>>>>> After about a minute, this line appears in the shell logging > >> output: > >>>>>>> > >>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing > >>> BlockManager > >>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no > >> recent > >>>>>> heart > >>>>>>> beats: 57510ms exceeds 45000ms > >>>>>>> > >>>>>>> Ctrl-C the shell. In jps there is now a worker, a master, and a > >>>>>>> CoarseGrainedExecutorBackend. > >>>>>>> > >>>>>>> Run jstack on the CGEBackend JVM, and I got the attached > >>> stacktraces. > >>>> I > >>>>>>> waited around for 15min then kill -9'd the JVM and restarted the > >>>> process. > >>>>>>> > >>>>>>> I wonder if what's happening here is that the threads that are > >>> spewing > >>>>>> data > >>>>>>> to disk (as that parallelize and persist would do) can write to > >> disk > >>>>>> faster > >>>>>>> than the cleanup threads can delete from disk. > >>>>>>> > >>>>>>> What do you think of that theory? > >>>>>>> > >>>>>>> > >>>>>>> Andrew > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan < > >>> mri...@gmail.com > >>>>> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>> shutdown hooks should not take 15 mins are you mentioned ! > >>>>>>>> On the other hand, how busy was your disk when this was > >> happening ? > >>>>>>>> (either due to spark or something else ?) > >>>>>>>> > >>>>>>>> It might just be that there was a lot of stuff to remove ? > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Mridul > >>>>>>>> > >>>>>>>> > >>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <and...@andrewash.com > >>> > >>>>>> wrote: > >>>>>>>>> Hi Spark devs, > >>>>>>>>> > >>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark shell on > >>> 0.9.0 > >>>> one > >>>>>>>>> of > >>>>>>>>> my workers goes dead in the spark master UI. I'm using the > >>>> standalone > >>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I think > >> it > >>>> may > >>>>>> be > >>>>>>>>> a > >>>>>>>>> regression. > >>>>>>>>> > >>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM with > >>>> jstack > >>>>>> and > >>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force flag. > >>> The > >>>>>> heap > >>>>>>>>> isn't full, but there are some interesting bits in the jstack. > >>>> Poking > >>>>>>>>> around a little, I think there may be some kind of deadlock in > >>> the > >>>>>>>>> shutdown > >>>>>>>>> hooks. > >>>>>>>>> > >>>>>>>>> Below are the threads I think are most interesting: > >>>>>>>>> > >>>>>>>>> Thread 14308: (state = BLOCKED) > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted > >>>> frame) > >>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted > >>>> frame) > >>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962 (Interpreted > >>> frame) > >>>>>>>>> - > >>>>>>>>> > >>>>>>>>> > >>>>>> > >>>> > >>> > >> > org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object, > >>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame) > >>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object) > >> @bci=25, > >>>>>>>>> line=498 > >>>>>>>>> (Interpreted frame) > >>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39, > >>>>>> line=456 > >>>>>>>>> (Interpreted frame) > >>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, > >>>> line=237 > >>>>>>>>> (Interpreted frame) > >>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted > >>>> frame) > >>>>>>>>> - > >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec() > >>>>>>>>> @bci=4, line=386 (Interpreted frame) > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10, > >>>> line=260 > >>>>>>>>> (Compiled frame) > >>>>>>>>> - > >>>>>>>>> > >>>>>>>>> > >>>>>> > >>>> > >>> > >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask) > >>>>>>>>> @bci=10, line=1339 (Compiled frame) > >>>>>>>>> - > >>>>>>>>> > >>>>>>>>> > >>>>>> > >>>> > >>> > >> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue) > >>>>>>>>> @bci=11, line=1979 (Compiled frame) > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() > >> @bci=14, > >>>>>>>>> line=107 > >>>>>>>>> (Interpreted frame) > >>>>>>>>> > >>>>>>>>> Thread 3865: (state = BLOCKED) > >>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame) > >>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted > >>>> frame) > >>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted > >> frame) > >>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, > >>> line=106 > >>>>>>>>> (Interpreted frame) > >>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46 > >>>>>>>>> (Interpreted > >>>>>>>>> frame) > >>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted > >>>> frame) > >>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted > >>>> frame) > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted > >>>> frame) > >>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, > >> line=52 > >>>>>>>>> (Interpreted frame) > >>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame) > >>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Thread 3987: (state = BLOCKED) > >>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0 > >> (Interpreted > >>>>>> frame) > >>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted frame) > >>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame) > >>>>>>>>> - org.apache.spark.util.Utils$.listFilesSafely(java.io.File) > >>>> @bci=1, > >>>>>>>>> line=466 (Interpreted frame) > >>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) > >>>>>> @bci=9, > >>>>>>>>> line=478 (Compiled frame) > >>>>>>>>> - > >>>>>>>>> > >>>>>>>>> > >>>>>> > >>>> > >>> > >> > org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File) > >>>>>>>>> @bci=4, line=479 (Compiled frame) > >>>>>>>>> - > >>>>>>>>> > >>>>>>>>> > >>>>>> > >>>> > >>> > >> > org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object) > >>>>>>>>> @bci=5, line=478 (Compiled frame) > >>>>>>>>> - > >>>>>>>>> > >>>>>>>>> > >>>>>> > >>>> > >>> > >> > scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized, > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame) > >>>>>>>>> - > >> scala.collection.mutable.WrappedArray.foreach(scala.Function1) > >>>>>>>>> @bci=2, > >>>>>>>>> line=34 (Compiled frame) > >>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) > >>>>>> @bci=19, > >>>>>>>>> line=478 (Interpreted frame) > >>>>>>>>> - > >>>>>>>>> > >>>>>>>>> > >>>>>> > >>>> > >>> > >> > org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File) > >>>>>>>>> @bci=14, line=141 (Interpreted frame) > >>>>>>>>> - > >>>>>>>>> > >>>>>>>>> > >>>>>> > >>>> > >>> > >> > org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object) > >>>>>>>>> @bci=5, line=139 (Interpreted frame) > >>>>>>>>> - > >>>>>>>>> > >>>>>>>>> > >>>>>> > >>>> > >>> > >> > scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized, > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame) > >>>>>>>>> - > >>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) > >>>>>>>>> @bci=2, > >>>>>>>>> line=108 (Interpreted frame) > >>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run() > >>> @bci=39, > >>>>>>>>> line=139 (Interpreted frame) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> I think what happened here is that thread 14308 received the > >> akka > >>>>>>>>> "shutdown" message and called System.exit(). This started > >> thread > >>>>>> 3865, > >>>>>>>>> which is the JVM shutting itself down. Part of that process is > >>>>>> running > >>>>>>>>> the > >>>>>>>>> shutdown hooks, so it started thread 3987. That thread is the > >>>>>> shutdown > >>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala, which > >>> looks > >>>>>> like > >>>>>>>>> this: > >>>>>>>>> > >>>>>>>>> private def addShutdownHook() { > >>>>>>>>> localDirs.foreach(localDir => > >>>>>>>>> Utils.registerShutdownDeleteDir(localDir)) > >>>>>>>>> Runtime.getRuntime.addShutdownHook(new Thread("delete Spark > >>>> local > >>>>>>>>> dirs") { > >>>>>>>>> override def run() { > >>>>>>>>> logDebug("Shutdown hook called") > >>>>>>>>> localDirs.foreach { localDir => > >>>>>>>>> try { > >>>>>>>>> if (!Utils.hasRootAsShutdownDeleteDir(localDir)) > >>>>>>>>> Utils.deleteRecursively(localDir) > >>>>>>>>> } catch { > >>>>>>>>> case t: Throwable => > >>>>>>>>> logError("Exception while deleting local spark > >> dir: > >>>> " + > >>>>>>>>> localDir, t) > >>>>>>>>> } > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> if (shuffleSender != null) { > >>>>>>>>> shuffleSender.stop() > >>>>>>>>> } > >>>>>>>>> } > >>>>>>>>> }) > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> It goes through and deletes the directories recursively. I was > >>>>>> thinking > >>>>>>>>> there might be some issues with concurrently-running shutdown > >>> hooks > >>>>>>>>> deleting things out from underneath each other (shutdown hook > >>>> javadocs > >>>>>>>>> say > >>>>>>>>> they're all started in parallel if multiple hooks are added) > >>>> causing > >>>>>> the > >>>>>>>>> File.list() in that last thread to take quite some time. > >>>>>>>>> > >>>>>>>>> While I was looking through the stacktrace the JVM finally > >> exited > >>>>>> (after > >>>>>>>>> 15-20min at least) so I won't be able to debug more until this > >>> bug > >>>>>>>>> strikes > >>>>>>>>> again. > >>>>>>>>> > >>>>>>>>> Any ideas on what might be going on here? > >>>>>>>>> > >>>>>>>>> Thanks! > >>>>>>>>> Andrew > >>>>>>> > >>>>>>> > >>>>>> > >>>> > >>> > >> > >