Below is the full stacktrace(real names of my classes were changed) with short description of entries from my code:
rdd.mapPartitions{ partition => //this is the line to which second stacktrace entry is pointing val sender = broadcastedValue.value // this is the maing place to which first stacktrace entry is pointing } java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to com.example.sender.MyClassReporter at com.example.flow.Calculator $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87) at com.example.flow.Calculator $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-12-14 17:10 GMT+01:00 Ted Yu <yuzhih...@gmail.com>: > Can you show the complete stack trace for the ClassCastException ? > > Please see the following thread: > http://search-hadoop.com/m/q3RTtgEUHVmJA1T1 > > Cheers > > On Mon, Dec 14, 2015 at 7:33 AM, alberskib <albers...@gmail.com> wrote: > >> Hey all, >> >> When my streaming application is restarting from failure (from >> checkpoint) I >> am receiving strange error: >> >> java.lang.ClassCastException: >> org.apache.spark.util.SerializableConfiguration cannot be cast to >> com.example.sender.MyClassReporter. >> >> Instance of B class is created on driver side (with proper config passed >> as >> constructor arg) and broadcasted to the executors in order to ensure that >> on >> each worker there will be only single instance. Everything is going well >> up >> to place where I am getting value of broadcasted field and executing >> function on it i.e. >> broadcastedValue.value.send(...) >> >> Below you can find definition of MyClassReporter (with trait): >> >> trait Reporter{ >> def send(name: String, value: String, timestamp: Long) : Unit >> def flush() : Unit >> } >> >> class MyClassReporter(config: MyClassConfig, flow: String) extends >> Reporter >> with Serializable { >> >> val prefix = s"${config.senderConfig.prefix}.$flow" >> >> var counter = 0 >> >> @transient >> private lazy val sender : GraphiteSender = initialize() >> >> @transient >> private lazy val threadPool = >> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()) >> >> private def initialize() = { >> val sender = new Sender( >> new InetSocketAddress(config.senderConfig.hostname, >> config.senderConfig.port) >> ) >> sys.addShutdownHook{ >> sender.close() >> } >> sender >> } >> >> override def send(name: String, value: String, timestamp: Long) : Unit >> = { >> threadPool.submit(new Runnable { >> override def run(): Unit = { >> try { >> counter += 1 >> if (!sender.isConnected) >> sender.connect() >> sender.send(s"$prefix.$name", value, timestamp) >> if (counter % graphiteConfig.batchSize == 0) >> sender.flush() >> }catch { >> case NonFatal(e) => { >> println(s"Problem with sending metric to graphite >> $prefix.$name: >> $value at $timestamp: ${e.getMessage}", e) >> Try{sender.close()}.recover{ >> case NonFatal(e) => println(s"Error closing graphite >> ${e.getMessage}", e) >> } >> } >> } >> } >> }) >> } >> >> Do you have any idea how I can solve this issue? Using broadcasted >> variable >> helps me keeping single socket open to the service on executor. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >