Ideally, interrupting the thread writing to disk should be sufficient
- though since we are in middle of shutdown when this is happening, it
is best case effort anyway.
Identifying which threads to interrupt will be interesting since most
of them are driven by threadpool's and we cant list all threads and
interrupt all of them !


Regards,
Mridul


On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <and...@andrewash.com> wrote:
> I think the solution where we stop the writing threads and then let the
> deleting threads completely clean up is the best option since the final
> state doesn't have half-deleted temp dirs scattered across the cluster.
>
> How feasible do you think it'd be to interrupt the other threads?
>
>
> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <mri...@gmail.com>wrote:
>
>> Looks like a pathological corner case here - where the the delete
>> thread is not getting run while the OS is busy prioritizing the thread
>> writing data (probably with heavy gc too).
>> Ideally, the delete thread would list files, remove them and then fail
>> when it tries to remove the non empty directory (since other thread
>> might be creating more in parallel).
>>
>>
>> Regards,
>> Mridul
>>
>>
>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <and...@andrewash.com> wrote:
>> > Got a repro locally on my MBP (the other was on a CentOS machine).
>> >
>> > Build spark, run a master and a worker with the sbin/start-all.sh script,
>> > then run this in a shell:
>> >
>> > import org.apache.spark.storage.StorageLevel._
>> > val s = sc.parallelize(1 to 1000000000).persist(MEMORY_AND_DISK_SER);
>> > s.count
>> >
>> > After about a minute, this line appears in the shell logging output:
>> >
>> > 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing BlockManager
>> > BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no recent
>> heart
>> > beats: 57510ms exceeds 45000ms
>> >
>> > Ctrl-C the shell.  In jps there is now a worker, a master, and a
>> > CoarseGrainedExecutorBackend.
>> >
>> > Run jstack on the CGEBackend JVM, and I got the attached stacktraces.  I
>> > waited around for 15min then kill -9'd the JVM and restarted the process.
>> >
>> > I wonder if what's happening here is that the threads that are spewing
>> data
>> > to disk (as that parallelize and persist would do) can write to disk
>> faster
>> > than the cleanup threads can delete from disk.
>> >
>> > What do you think of that theory?
>> >
>> >
>> > Andrew
>> >
>> >
>> >
>> > On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <mri...@gmail.com>
>> > wrote:
>> >>
>> >> shutdown hooks should not take 15 mins are you mentioned !
>> >> On the other hand, how busy was your disk when this was happening ?
>> >> (either due to spark or something else ?)
>> >>
>> >> It might just be that there was a lot of stuff to remove ?
>> >>
>> >> Regards,
>> >> Mridul
>> >>
>> >>
>> >> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <and...@andrewash.com>
>> wrote:
>> >> > Hi Spark devs,
>> >> >
>> >> > Occasionally when hitting Ctrl-C in the scala spark shell on 0.9.0 one
>> >> > of
>> >> > my workers goes dead in the spark master UI.  I'm using the standalone
>> >> > cluster and didn't ever see this while using 0.8.0 so I think it may
>> be
>> >> > a
>> >> > regression.
>> >> >
>> >> > When I prod on the hung CoarseGrainedExecutorBackend JVM with jstack
>> and
>> >> > jmap -heap, it doesn't respond unless I add the -F force flag.  The
>> heap
>> >> > isn't full, but there are some interesting bits in the jstack.  Poking
>> >> > around a little, I think there may be some kind of deadlock in the
>> >> > shutdown
>> >> > hooks.
>> >> >
>> >> > Below are the threads I think are most interesting:
>> >> >
>> >> > Thread 14308: (state = BLOCKED)
>> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
>> >> >  - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted frame)
>> >> >  - java.lang.System.exit(int) @bci=4, line=962 (Interpreted frame)
>> >> >  -
>> >> >
>> >> >
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
>> >> > scala.Function1) @bci=352, line=81 (Interpreted frame)
>> >> >  - akka.actor.ActorCell.receiveMessage(java.lang.Object) @bci=25,
>> >> > line=498
>> >> > (Interpreted frame)
>> >> >  - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39,
>> line=456
>> >> > (Interpreted frame)
>> >> >  - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, line=237
>> >> > (Interpreted frame)
>> >> >  - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted frame)
>> >> >  - akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
>> >> > @bci=4, line=386 (Interpreted frame)
>> >> >  - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10, line=260
>> >> > (Compiled frame)
>> >> >  -
>> >> >
>> >> >
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
>> >> > @bci=10, line=1339 (Compiled frame)
>> >> >  -
>> >> >
>> >> >
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
>> >> > @bci=11, line=1979 (Compiled frame)
>> >> >  - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() @bci=14,
>> >> > line=107
>> >> > (Interpreted frame)
>> >> >
>> >> > Thread 3865: (state = BLOCKED)
>> >> >  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>> >> >  - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted frame)
>> >> >  - java.lang.Thread.join() @bci=2, line=1354 (Interpreted frame)
>> >> >  - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, line=106
>> >> > (Interpreted frame)
>> >> >  - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
>> >> > (Interpreted
>> >> > frame)
>> >> >  - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted frame)
>> >> >  - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted frame)
>> >> >  - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted frame)
>> >> >  - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, line=52
>> >> > (Interpreted frame)
>> >> >  - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
>> >> >  - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>> >> >
>> >> >
>> >> > Thread 3987: (state = BLOCKED)
>> >> >  - java.io.UnixFileSystem.list(java.io.File) @bci=0 (Interpreted
>> frame)
>> >> >  - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
>> >> >  - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
>> >> >  - org.apache.spark.util.Utils$.listFilesSafely(java.io.File) @bci=1,
>> >> > line=466 (Interpreted frame)
>> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
>> @bci=9,
>> >> > line=478 (Compiled frame)
>> >> >  -
>> >> >
>> >> >
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
>> >> > @bci=4, line=479 (Compiled frame)
>> >> >  -
>> >> >
>> >> >
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
>> >> > @bci=5, line=478 (Compiled frame)
>> >> >  -
>> >> >
>> >> >
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
>> >> >  - scala.collection.mutable.WrappedArray.foreach(scala.Function1)
>> >> > @bci=2,
>> >> > line=34 (Compiled frame)
>> >> >  - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
>> @bci=19,
>> >> > line=478 (Interpreted frame)
>> >> >  -
>> >> >
>> >> >
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
>> >> > @bci=14, line=141 (Interpreted frame)
>> >> >  -
>> >> >
>> >> >
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
>> >> > @bci=5, line=139 (Interpreted frame)
>> >> >  -
>> >> >
>> >> >
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> >> > scala.Function1) @bci=22, line=33 (Compiled frame)
>> >> >  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>> >> > @bci=2,
>> >> > line=108 (Interpreted frame)
>> >> >  - org.apache.spark.storage.DiskBlockManager$$anon$1.run() @bci=39,
>> >> > line=139 (Interpreted frame)
>> >> >
>> >> >
>> >> > I think what happened here is that thread 14308 received the akka
>> >> > "shutdown" message and called System.exit().  This started thread
>> 3865,
>> >> > which is the JVM shutting itself down.  Part of that process is
>> running
>> >> > the
>> >> > shutdown hooks, so it started thread 3987.  That thread is the
>> shutdown
>> >> > hook from addShutdownHook() in DiskBlockManager.scala, which looks
>> like
>> >> > this:
>> >> >
>> >> >   private def addShutdownHook() {
>> >> >     localDirs.foreach(localDir =>
>> >> > Utils.registerShutdownDeleteDir(localDir))
>> >> >     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local
>> >> > dirs") {
>> >> >       override def run() {
>> >> >         logDebug("Shutdown hook called")
>> >> >         localDirs.foreach { localDir =>
>> >> >           try {
>> >> >             if (!Utils.hasRootAsShutdownDeleteDir(localDir))
>> >> > Utils.deleteRecursively(localDir)
>> >> >           } catch {
>> >> >             case t: Throwable =>
>> >> >               logError("Exception while deleting local spark dir: " +
>> >> > localDir, t)
>> >> >           }
>> >> >         }
>> >> >
>> >> >         if (shuffleSender != null) {
>> >> >           shuffleSender.stop()
>> >> >         }
>> >> >       }
>> >> >     })
>> >> >   }
>> >> >
>> >> > It goes through and deletes the directories recursively.  I was
>> thinking
>> >> > there might be some issues with concurrently-running shutdown hooks
>> >> > deleting things out from underneath each other (shutdown hook javadocs
>> >> > say
>> >> > they're all started in parallel if multiple hooks are added) causing
>> the
>> >> > File.list() in that last thread to take quite some time.
>> >> >
>> >> > While I was looking through the stacktrace the JVM finally exited
>> (after
>> >> > 15-20min at least) so I won't be able to debug more until this bug
>> >> > strikes
>> >> > again.
>> >> >
>> >> > Any ideas on what might be going on here?
>> >> >
>> >> > Thanks!
>> >> > Andrew
>> >
>> >
>>

Reply via email to