What version of Spark are you using. You may be hitting a known but solved bug where the receivers would not get stop signal and (stopGracefully = true) would wait for a while for the receivers to stop indefinitely. Try setting stopGracefully to false and see if it works. This bug should have been solved in spark 1.2.1
https://issues.apache.org/jira/browse/SPARK-5035 TD On Thu, Mar 12, 2015 at 7:48 PM, Jose Fernandez <jfernan...@sdl.com> wrote: > Thanks for the reply! > > > > Theoretically I should be able to do as you suggest as I follow the pool > design pattern from the documentation, but I don’t seem to be able to run > any code after .stop() is called. > > > > override def main(args: Array[String]) { > > // setup > > val ssc = new StreamingContext(sparkConf, Seconds(streamTime)) > > val inputStreams = (1 to numReceivers).map(i => > ssc.receiverStream(<custom receiver>)) > > val messages = ssc.union(inputStreams) > > > > messages.foreachRDD { rdd => > > rdd.foreachPartition { p => > > val indexer = Indexer.getInstance() > > > > p.foreach(Indexer.process(_) match { > > case Some(entry) => indexer.index(entry) > > case None => > > }) > > > > Indexer.returnInstance(indexer) > > } > > } > > > > messages.print() > > > > sys.ShutdownHookThread { > > logInfo("****************** Shutdown hook triggered > ******************") > > ssc.stop(false, true) > > logInfo("****************** Shutdown finished ******************") > > ssc.stop(true) > > } > > > > ssc.start() > > ssc.awaitTermination() > > } > > > > The first shutdown log message is always displayed, but the second message > never does. I’ve tried multiple permutations of the stop function calls and > even used try/catch around it. I’m running in yarn-cluster mode using Spark > 1.2 on CDH 5.3. I stop the application with yarn application -kill <appID>. > > > > > > *From:* Tathagata Das [mailto:t...@databricks.com] > *Sent:* Thursday, March 12, 2015 1:29 PM > *To:* Jose Fernandez > *Cc:* user@spark.apache.org > *Subject:* Re: Handling worker batch processing during driver shutdown > > > > Can you access the batcher directly? Like is there is there a handle to > get access to the batchers on the executors by running a task on that > executor? If so, after the streamingContext has been stopped (not the > SparkContext), then you can use `sc.makeRDD()` to run a dummy task like > this. > > > > sc.makeRDD(1 to 1000, 1000).foreach { x => > > Batcher.get().flush() > > } > > > > With large number of tasks and no other jobs running in the system, at > least one task will run in each executor and therefore will flush the > batcher. > > > > TD > > > > On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez <jfernan...@sdl.com> > wrote: > > Hi folks, > > > > I have a shutdown hook in my driver which stops the streaming context > cleanly. This is great as workers can finish their current processing unit > before shutting down. Unfortunately each worker contains a batch processor > which only flushes every X entries. We’re indexing to different indices in > elasticsearch and using the bulk index request for performance. As far as > Spark is concerned, once data is added to the batcher it is considered > processed, so our workers are being shut down with data still in the > batcher. > > > > Is there any way to coordinate the shutdown with the workers? I haven’t > had any luck searching for a solution online. I would appreciate any > suggestions you may have. > > > > Thanks :) > > > > > <http://www.sdl.com/innovate/sanfran> > > > > SDL PLC confidential, all rights reserved. If you are not the intended > recipient of this mail SDL requests and requires that you delete it without > acting upon or copying any of its contents, and we further request that you > advise us. > > SDL PLC is a public limited company registered in England and Wales. > Registered number: 02675207. > Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 > 7DY, UK. > > > > This message has been scanned for malware by Websense. www.websense.com > > > > > > Click here > <https://www.mailcontrol.com/sr/XpJmGbjdkhnGX2PQPOmvUq10po4Wab0lISUyc4KGaMSKtJYdrvwK9eA2sGsCtyo!dlcWvDplrZSU8yB5sLY89Q==> > to report this email as spam. > > <http://www.sdl.com/innovate/sanfran> > > SDL PLC confidential, all rights reserved. If you are not the intended > recipient of this mail SDL requests and requires that you delete it without > acting upon or copying any of its contents, and we further request that you > advise us. > > SDL PLC is a public limited company registered in England and Wales. > Registered number: 02675207. > Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 > 7DY, UK. >