That definitely sound more reliable. Worth trying out if there is a
reliable way of reproducing the deadlock-like scenario.

TD


On Thu, Feb 6, 2014 at 11:38 PM, Matei Zaharia <matei.zaha...@gmail.com>wrote:

> I don't think we necessarily want to do this through the DAGScheduler
> because the worker might also shut down due to some unusual termination
> condition, like the driver node crashing. Can't we do it at the top of the
> shutdown hook instead? If all the threads are in the same thread pool it
> might be possible to interrupt or stop the whole pool.
>
> Matei
>
> On Feb 6, 2014, at 11:30 PM, Andrew Ash <and...@andrewash.com> wrote:
>
> > That's genius.  Of course when a worker is told to shutdown it should
> > interrupt its worker threads -- I think that would address this issue.
> >
> > Are you thinking to put
> >
> > running.map(_.jobId).foreach { handleJobCancellation }
> >
> > at the top of the StopDAGScheduler block?
> >
> >
> > On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
> > <tathagata.das1...@gmail.com>wrote:
> >
> >> Its highly likely that the executor with the threadpool that runs the
> tasks
> >> are the only set of threads that writes to disk. The tasks are designed
> to
> >> be interrupted when the corresponding job is cancelled. So a reasonably
> >> simple way could be to actually cancel the currently active jobs, which
> >> would send the signal to the worker to stop the tasks. Currently, the
> >> DAGScheduler<
> >>
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
> >>> does
> >> not seem to actually cancel the jobs, only mark them as failed. So it
> >> may be a simple addition.
> >>
> >> There may be some complications with the external spilling of shuffle
> data
> >> to disk not stopping immediately when the task is marked for killing.
> Gotta
> >> try it out.
> >>
> >> TD
> >>
> >> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <and...@andrewash.com>
> wrote:
> >>
> >>> There is probably just one threadpool that has task threads -- is it
> >>> possible to enumerate and interrupt just those?  We may need to keep
> >> string
> >>> a reference to that threadpool through to the shutdown thread to make
> >> that
> >>> happen.
> >>>
> >>>
> >>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <mri...@gmail.com
> >>>> wrote:
> >>>
> >>>> Ideally, interrupting the thread writing to disk should be sufficient
> >>>> - though since we are in middle of shutdown when this is happening, it
> >>>> is best case effort anyway.
> >>>> Identifying which threads to interrupt will be interesting since most
> >>>> of them are driven by threadpool's and we cant list all threads and
> >>>> interrupt all of them !
> >>>>
> >>>>
> >>>> Regards,
> >>>> Mridul
> >>>>
> >>>>
> >>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <and...@andrewash.com>
> >> wrote:
> >>>>> I think the solution where we stop the writing threads and then let
> >> the
> >>>>> deleting threads completely clean up is the best option since the
> >> final
> >>>>> state doesn't have half-deleted temp dirs scattered across the
> >> cluster.
> >>>>>
> >>>>> How feasible do you think it'd be to interrupt the other threads?
> >>>>>
> >>>>>
> >>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
> >> mri...@gmail.com
> >>>>> wrote:
> >>>>>
> >>>>>> Looks like a pathological corner case here - where the the delete
> >>>>>> thread is not getting run while the OS is busy prioritizing the
> >> thread
> >>>>>> writing data (probably with heavy gc too).
> >>>>>> Ideally, the delete thread would list files, remove them and then
> >> fail
> >>>>>> when it tries to remove the non empty directory (since other thread
> >>>>>> might be creating more in parallel).
> >>>>>>
> >>>>>>
> >>>>>> Regards,
> >>>>>> Mridul
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <and...@andrewash.com>
> >>>> wrote:
> >>>>>>> Got a repro locally on my MBP (the other was on a CentOS machine).
> >>>>>>>
> >>>>>>> Build spark, run a master and a worker with the sbin/start-all.sh
> >>>> script,
> >>>>>>> then run this in a shell:
> >>>>>>>
> >>>>>>> import org.apache.spark.storage.StorageLevel._
> >>>>>>> val s = sc.parallelize(1 to
> >>> 1000000000).persist(MEMORY_AND_DISK_SER);
> >>>>>>> s.count
> >>>>>>>
> >>>>>>> After about a minute, this line appears in the shell logging
> >> output:
> >>>>>>>
> >>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
> >>> BlockManager
> >>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no
> >> recent
> >>>>>> heart
> >>>>>>> beats: 57510ms exceeds 45000ms
> >>>>>>>
> >>>>>>> Ctrl-C the shell.  In jps there is now a worker, a master, and a
> >>>>>>> CoarseGrainedExecutorBackend.
> >>>>>>>
> >>>>>>> Run jstack on the CGEBackend JVM, and I got the attached
> >>> stacktraces.
> >>>> I
> >>>>>>> waited around for 15min then kill -9'd the JVM and restarted the
> >>>> process.
> >>>>>>>
> >>>>>>> I wonder if what's happening here is that the threads that are
> >>> spewing
> >>>>>> data
> >>>>>>> to disk (as that parallelize and persist would do) can write to
> >> disk
> >>>>>> faster
> >>>>>>> than the cleanup threads can delete from disk.
> >>>>>>>
> >>>>>>> What do you think of that theory?
> >>>>>>>
> >>>>>>>
> >>>>>>> Andrew
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
> >>> mri...@gmail.com
> >>>>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> shutdown hooks should not take 15 mins are you mentioned !
> >>>>>>>> On the other hand, how busy was your disk when this was
> >> happening ?
> >>>>>>>> (either due to spark or something else ?)
> >>>>>>>>
> >>>>>>>> It might just be that there was a lot of stuff to remove ?
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Mridul
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <and...@andrewash.com
> >>>
> >>>>>> wrote:
> >>>>>>>>> Hi Spark devs,
> >>>>>>>>>
> >>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark shell on
> >>> 0.9.0
> >>>> one
> >>>>>>>>> of
> >>>>>>>>> my workers goes dead in the spark master UI.  I'm using the
> >>>> standalone
> >>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I think
> >> it
> >>>> may
> >>>>>> be
> >>>>>>>>> a
> >>>>>>>>> regression.
> >>>>>>>>>
> >>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM with
> >>>> jstack
> >>>>>> and
> >>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force flag.
> >>> The
> >>>>>> heap
> >>>>>>>>> isn't full, but there are some interesting bits in the jstack.
> >>>> Poking
> >>>>>>>>> around a little, I think there may be some kind of deadlock in
> >>> the
> >>>>>>>>> shutdown
> >>>>>>>>> hooks.
> >>>>>>>>>
> >>>>>>>>> Below are the threads I think are most interesting:
> >>>>>>>>>
> >>>>>>>>> Thread 14308: (state = BLOCKED)
> >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> >>>> frame)
> >>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted
> >>>> frame)
> >>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962 (Interpreted
> >>> frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
> >>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame)
> >>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object)
> >> @bci=25,
> >>>>>>>>> line=498
> >>>>>>>>> (Interpreted frame)
> >>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39,
> >>>>>> line=456
> >>>>>>>>> (Interpreted frame)
> >>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24,
> >>>> line=237
> >>>>>>>>> (Interpreted frame)
> >>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted
> >>>> frame)
> >>>>>>>>> -
> >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
> >>>>>>>>> @bci=4, line=386 (Interpreted frame)
> >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10,
> >>>> line=260
> >>>>>>>>> (Compiled frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
> >>>>>>>>> @bci=10, line=1339 (Compiled frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
> >>>>>>>>> @bci=11, line=1979 (Compiled frame)
> >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
> >> @bci=14,
> >>>>>>>>> line=107
> >>>>>>>>> (Interpreted frame)
> >>>>>>>>>
> >>>>>>>>> Thread 3865: (state = BLOCKED)
> >>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
> >>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted
> >>>> frame)
> >>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted
> >> frame)
> >>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
> >>> line=106
> >>>>>>>>> (Interpreted frame)
> >>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
> >>>>>>>>> (Interpreted
> >>>>>>>>> frame)
> >>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted
> >>>> frame)
> >>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted
> >>>> frame)
> >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
> >>>> frame)
> >>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8,
> >> line=52
> >>>>>>>>> (Interpreted frame)
> >>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
> >>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Thread 3987: (state = BLOCKED)
> >>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0
> >> (Interpreted
> >>>>>> frame)
> >>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
> >>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
> >>>>>>>>> - org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
> >>>> @bci=1,
> >>>>>>>>> line=466 (Interpreted frame)
> >>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> >>>>>> @bci=9,
> >>>>>>>>> line=478 (Compiled frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
> >>>>>>>>> @bci=4, line=479 (Compiled frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
> >>>>>>>>> @bci=5, line=478 (Compiled frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> >>>>>>>>> -
> >> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
> >>>>>>>>> @bci=2,
> >>>>>>>>> line=34 (Compiled frame)
> >>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
> >>>>>> @bci=19,
> >>>>>>>>> line=478 (Interpreted frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
> >>>>>>>>> @bci=14, line=141 (Interpreted frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
> >>>>>>>>> @bci=5, line=139 (Interpreted frame)
> >>>>>>>>> -
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
> >>>>>>>>> -
> >>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> >>>>>>>>> @bci=2,
> >>>>>>>>> line=108 (Interpreted frame)
> >>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
> >>> @bci=39,
> >>>>>>>>> line=139 (Interpreted frame)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I think what happened here is that thread 14308 received the
> >> akka
> >>>>>>>>> "shutdown" message and called System.exit().  This started
> >> thread
> >>>>>> 3865,
> >>>>>>>>> which is the JVM shutting itself down.  Part of that process is
> >>>>>> running
> >>>>>>>>> the
> >>>>>>>>> shutdown hooks, so it started thread 3987.  That thread is the
> >>>>>> shutdown
> >>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala, which
> >>> looks
> >>>>>> like
> >>>>>>>>> this:
> >>>>>>>>>
> >>>>>>>>>  private def addShutdownHook() {
> >>>>>>>>>    localDirs.foreach(localDir =>
> >>>>>>>>> Utils.registerShutdownDeleteDir(localDir))
> >>>>>>>>>    Runtime.getRuntime.addShutdownHook(new Thread("delete Spark
> >>>> local
> >>>>>>>>> dirs") {
> >>>>>>>>>      override def run() {
> >>>>>>>>>        logDebug("Shutdown hook called")
> >>>>>>>>>        localDirs.foreach { localDir =>
> >>>>>>>>>          try {
> >>>>>>>>>            if (!Utils.hasRootAsShutdownDeleteDir(localDir))
> >>>>>>>>> Utils.deleteRecursively(localDir)
> >>>>>>>>>          } catch {
> >>>>>>>>>            case t: Throwable =>
> >>>>>>>>>              logError("Exception while deleting local spark
> >> dir:
> >>>> " +
> >>>>>>>>> localDir, t)
> >>>>>>>>>          }
> >>>>>>>>>        }
> >>>>>>>>>
> >>>>>>>>>        if (shuffleSender != null) {
> >>>>>>>>>          shuffleSender.stop()
> >>>>>>>>>        }
> >>>>>>>>>      }
> >>>>>>>>>    })
> >>>>>>>>>  }
> >>>>>>>>>
> >>>>>>>>> It goes through and deletes the directories recursively.  I was
> >>>>>> thinking
> >>>>>>>>> there might be some issues with concurrently-running shutdown
> >>> hooks
> >>>>>>>>> deleting things out from underneath each other (shutdown hook
> >>>> javadocs
> >>>>>>>>> say
> >>>>>>>>> they're all started in parallel if multiple hooks are added)
> >>>> causing
> >>>>>> the
> >>>>>>>>> File.list() in that last thread to take quite some time.
> >>>>>>>>>
> >>>>>>>>> While I was looking through the stacktrace the JVM finally
> >> exited
> >>>>>> (after
> >>>>>>>>> 15-20min at least) so I won't be able to debug more until this
> >>> bug
> >>>>>>>>> strikes
> >>>>>>>>> again.
> >>>>>>>>>
> >>>>>>>>> Any ideas on what might be going on here?
> >>>>>>>>>
> >>>>>>>>> Thanks!
> >>>>>>>>> Andrew
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
>
>

Reply via email to