Below is the full stacktrace(real names of my classes were changed) with
short description of entries from my code:

rdd.mapPartitions{ partition => //this is the line to which second
stacktrace entry is pointing
  val sender =  broadcastedValue.value // this is the maing place to which
first stacktrace entry is pointing
}

java.lang.ClassCastException:
org.apache.spark.util.SerializableConfiguration cannot be cast to
com.example.sender.MyClassReporter
        at com.example.flow.Calculator
$$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
        at com.example.flow.Calculator
$$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

2015-12-14 17:10 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:

> Can you show the complete stack trace for the ClassCastException ?
>
> Please see the following thread:
> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1
>
> Cheers
>
> On Mon, Dec 14, 2015 at 7:33 AM, alberskib <albers...@gmail.com> wrote:
>
>> Hey all,
>>
>> When my streaming application is restarting from failure (from
>> checkpoint) I
>> am receiving strange error:
>>
>> java.lang.ClassCastException:
>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>> com.example.sender.MyClassReporter.
>>
>> Instance of B class is created on driver side (with proper config passed
>> as
>> constructor arg) and broadcasted to the executors in order to ensure that
>> on
>> each worker there will be only single instance. Everything is going well
>> up
>> to place where I am getting value of broadcasted field and executing
>> function on it i.e.
>> broadcastedValue.value.send(...)
>>
>> Below you can find definition of MyClassReporter (with trait):
>>
>> trait Reporter{
>>   def send(name: String, value: String, timestamp: Long) : Unit
>>   def flush() : Unit
>> }
>>
>> class MyClassReporter(config: MyClassConfig, flow: String) extends
>> Reporter
>> with Serializable {
>>
>>   val prefix = s"${config.senderConfig.prefix}.$flow"
>>
>>   var counter = 0
>>
>>   @transient
>>   private lazy val sender : GraphiteSender = initialize()
>>
>>   @transient
>>   private lazy val threadPool =
>> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
>>
>>   private def initialize() = {
>>       val sender = new Sender(
>>         new InetSocketAddress(config.senderConfig.hostname,
>> config.senderConfig.port)
>>       )
>>       sys.addShutdownHook{
>>         sender.close()
>>       }
>>       sender
>>   }
>>
>>   override def send(name: String, value: String, timestamp: Long) : Unit
>> = {
>>     threadPool.submit(new Runnable {
>>       override def run(): Unit = {
>>         try {
>>           counter += 1
>>           if (!sender.isConnected)
>>             sender.connect()
>>           sender.send(s"$prefix.$name", value, timestamp)
>>           if (counter % graphiteConfig.batchSize == 0)
>>             sender.flush()
>>         }catch {
>>           case NonFatal(e) => {
>>             println(s"Problem with sending metric to graphite
>> $prefix.$name:
>> $value at $timestamp: ${e.getMessage}", e)
>>             Try{sender.close()}.recover{
>>               case NonFatal(e) => println(s"Error closing graphite
>> ${e.getMessage}", e)
>>             }
>>           }
>>         }
>>       }
>>     })
>>   }
>>
>> Do you have any idea how I can solve this issue? Using broadcasted
>> variable
>> helps me keeping single socket open to the service on executor.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

Reply via email to