Denis Buravlev created SPARK-15669:
--------------------------------------

             Summary: Driver and Executor have different akka configuration in 
YARN cluster mode
                 Key: SPARK-15669
                 URL: https://issues.apache.org/jira/browse/SPARK-15669
             Project: Spark
          Issue Type: Bug
          Components: Spark Core, Streaming
    Affects Versions: 1.6.1
         Environment: Scala: 2.10
Spark: 1.6.1
Yarn Cluster
            Reporter: Denis Buravlev
            Priority: Critical


I'm trying to run Spark Streaming application that uses ZeroMQ on a YARN 
cluster. The application is failed with following message: {{ERROR 
actor.OneForOneStrategy: No configuration setting found for key 'akka.zeromq'}}.
The configuration file exits and avaiable from Driver and Executor.

After small research I've found one interesting thing: if you'll try execute 
the code
{{println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error!")})}}
 
you'll get different results for Driver and Executor:
Driver stdout log: value from configuration(e.g. 100ms)
Executor stdout log: "error!"

*Is it correct behaviour for Spark?*

Full code for reproducing:
{code}
object Application {

  def main(args: Array[String]) {
    val config = ConfigurationManager.getConfig(args)
    val sparkConf = new SparkConf().setAppName("TestApp")

    val ssc = new StreamingContext(sparkConf, Seconds(1))

    println("Driver:")
    
println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error1"))

    val rddQueue = new Queue[RDD[Int]]()

    val inputStream = ssc.queueStream(rddQueue)

    val mappedStream = inputStream.map{x =>
      println("Executor:")
      
println(Try(SparkEnv.get.actorSystem.settings.config.getString("akka.zeromq.poll-timeout")).toOption.getOrElse("error2"))

      (x % 10, 1)
    }
    val reducedStream = mappedStream.reduceByKey(_ + _)
    reducedStream.print()

    ssc.start()

    for (i <- 1 to 30) {
      rddQueue.synchronized {
        rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
      }
      Thread.sleep(1000)
    }
    ssc.awaitTermination()
  }
}
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to