[ 
https://issues.apache.org/jira/browse/SPARK-19264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827181#comment-15827181
 ] 

hustfxj edited comment on SPARK-19264 at 1/18/17 1:16 AM:
----------------------------------------------------------

[~srowen] sorry, I didn't say it clearly. I means the spark application can't 
be done when it contains other unfinished non-daemon. Look at the followed 
example. the driver program should crash due to the exception. But In fact the 
driver program can't crash because the timer threads still are runnning.
{code:title=Test.scala|borderStyle=solid}   
   val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    sparkConf.set("spark.streaming.blockInterval", "1000ms")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    //non-daemon thread
    val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
      new ThreadFactory() {
        def newThread(r: Runnable): Thread = new Thread(r, 
"Driver-Commit-Thread")
      })
    scheduledExecutorService.scheduleAtFixedRate(
      new Runnable() {
        def run() {
          try {
            System.out.println("runable")
          } catch {
            case e: Exception => {
              System.out.println("ScheduledTask persistAllConsumerOffset 
exception", e)
            }
          }
        }
      }, 1000, 1000 * 5, TimeUnit.MILLISECONDS)
    val lines = ssc.receiverStream(new 
WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + 
y, 10)
    wordCounts.foreachRDD{rdd =>
      rdd.collect().foreach(println)
      throw new RuntimeException
    }
    ssc.start()
    ssc.awaitTermination()
{code}



was (Author: hustfxj):
[~srowen] sorry, I didn't say it clearly. I means the spark application can't 
be done when it contains other unfinished non-daemon. Look at the follows 
example. the driver program should crash due to the exception. But In fact the 
driver program can't crash because the timer threads still are runnning.
{code:title=Test.scala|borderStyle=solid}   
   val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    sparkConf.set("spark.streaming.blockInterval", "1000ms")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    //non-daemon thread
    val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
      new ThreadFactory() {
        def newThread(r: Runnable): Thread = new Thread(r, 
"Driver-Commit-Thread")
      })
    scheduledExecutorService.scheduleAtFixedRate(
      new Runnable() {
        def run() {
          try {
            System.out.println("runable")
          } catch {
            case e: Exception => {
              System.out.println("ScheduledTask persistAllConsumerOffset 
exception", e)
            }
          }
        }
      }, 1000, 1000 * 5, TimeUnit.MILLISECONDS)
    val lines = ssc.receiverStream(new 
WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + 
y, 10)
    wordCounts.foreachRDD{rdd =>
      rdd.collect().foreach(println)
      throw new RuntimeException
    }
    ssc.start()
    ssc.awaitTermination()
{code}


> Work should start driver, the same to  AM  of yarn 
> ---------------------------------------------------
>
>                 Key: SPARK-19264
>                 URL: https://issues.apache.org/jira/browse/SPARK-19264
>             Project: Spark
>          Issue Type: Improvement
>            Reporter: hustfxj
>
>   I think work can't start driver by "ProcessBuilderLike",  thus we can't 
> know the application's main thread is finished or not if the application's 
> main thread contains some non-daemon threads. Because the program terminates 
> when there no longer is any non-daemon thread running (or someone called 
> System.exit). The main thread can have finished long ago. 
>     worker should  start driver like AM of YARN . As followed:
> {code:title=ApplicationMaster.scala|borderStyle=solid}    
>      mainMethod.invoke(null, userArgs.toArray)
>      finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
>      logDebug("Done running users class")
> {code}
> Then the work can monitor the driver's main thread, and know the 
> application's state. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to