I don't know Spark, so I don't know quite what this is trying to do, but
Actors typically are not serializable -- you send the ActorRef for the
Actor, not the Actor itself.  I'm not sure it even makes any sense
semantically to try to serialize and send an Actor...

On Thu, Aug 3, 2017 at 11:49 PM, 周梦想 <abloz...@gmail.com> wrote:

>
> my main file
> /**
>  * Author: zhouhh
>  * Date  : 2017-06-26
>  *
>  * main file,start web server, init akka ,spark streaming
>  */
> object WebServer extends Directives {
>
>   val log = Logger(LoggerFactory.getLogger("WebServer"))
>   log.info("==========enter WebServer init========")
>
>   //init ActorSystem
>   implicit val system = ActorSystem("recommed_System")
>   implicit val materializer = ActorMaterializer()
>   implicit val executionContext: ExecutionContextExecutor =
> system.dispatcher
>
>   //init spark streaming
>   val sparkConf: SparkConf = new SparkConf().setMaster(Config.
> sparkMaster).setAppName("WebUserActionConsumer")
>   val ssc: StreamingContext = new StreamingContext(sparkConf,
> Seconds(Config.batchDuration))
>
>   //init routes
>   val routes: Route = BaseRoutes.baseRoutes ~
>     new RcmdRoutes().rcmdRoutes ~
>     SimpleRoutes.simpleRoutes
>
>   def main(args: Array[String]) {
>
>     log.debug("==========enter WebServer beginning========")
>
>
>     val userActConsumerActor = system.actorOf(Props(new
> UserActConsumerActor(ssc)), name = "UserActConsumerActor")
>
>     userActConsumerActor ! UserActMsgConsumer
>
>
>     val bindingFuture = Http().bindAndHandle(routes, Config.host,
> Config.port)
>
>     log.info(s"Server online at ${Config.host}:${Config.port}/\nPress
> RETURN to stop...")
>
>     ssc.start()
>     ssc.awaitTermination()
>
>     log.info(s"Server online at ${Config.host}:${Config.port}/\nPress
> RETURN to stop...")
>
>     StdIn.readLine() // let it run until user presses return
>     bindingFuture
>       .flatMap(_.unbind()) // trigger unbinding from the port
>       .onComplete(_ => system.terminate()) // and shutdown when done
>
>   }
>
>
> my spark streaming kafka consumer file
>
> class UserActConsumerActor(ssc: StreamingContext) extends Actor {
>   val logger = Logger(LoggerFactory.getLogger("UserActConsumerActor"))
>
>   val userActMsgParseActor: ActorRef = 
> context.actorOf(Props[UserActMsgParseActor],
> name = "UserActMsgParseActor")
>
>   def receive = {
>
>     case UserActMsgConsumer => parse()
>     case e => logger.info(s"error receive of user act msg:  $e")
>   }
>
>   val stream: InputDStream[ConsumerRecord[String, String]] =
> KafkaUtils.createDirectStream(
>     ssc,
>     PreferConsistent,
>     Subscribe[String, String](Config.useracttopics.split(","),
> Config.kafkaParams)
>   )
>
>   def parse(): Unit = {
>
>     stream.foreachRDD(rdd => rdd.foreach(x => {
>       logger.info("======user act value:====\n")
>       logger.info(x.value()) //value is String
>
>       Try {
>         x.value().parseJson.convertTo[UserActMsg]
>       } match {
>         case Success(msg) => userActMsgParseActor ! msg
>         //        case Success(msg) => context.parent ! msg
>         case Failure(x) => logger.error(s"$x") //println(s"$x") //
>         //        case Success(msg) => println(msg)
>
>       }
>
>     }))
>   }
>
> }
>
>
> when i run webserver, it will report the error as the Title repeatly.
>
> 17/08/04 11:40:15 ERROR JobScheduler: Error running job streaming job
> 1501818015000 ms.0
> org.apache.spark.SparkException: Task not serializable
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
> ClosureCleaner.scala:298)
> at org.apache.spark.util.ClosureCleaner$.org$apache$
> spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
> at com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1.apply(
> UserActConsumerActor.scala:46)
> at com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1.apply(
> UserActConsumerActor.scala:46)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$
> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$
> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
> at org.apache.spark.streaming.dstream.ForEachDStream$$
> anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> at org.apache.spark.streaming.dstream.ForEachDStream$$
> anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at org.apache.spark.streaming.dstream.ForEachDStream$$
> anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at org.apache.spark.streaming.dstream.DStream.
> createRDDWithLocalProperties(DStream.scala:416)
> at org.apache.spark.streaming.dstream.ForEachDStream$$
> anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(
> ForEachDStream.scala:50)
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(
> ForEachDStream.scala:50)
> at scala.util.Try$.apply(Try.scala:192)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> at org.apache.spark.streaming.scheduler.JobScheduler$
> JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
> at org.apache.spark.streaming.scheduler.JobScheduler$
> JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at org.apache.spark.streaming.scheduler.JobScheduler$
> JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at org.apache.spark.streaming.scheduler.JobScheduler$
> JobHandler.run(JobScheduler.scala:256)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.NotSerializableException: com.ronglian.actor.
> UserActConsumerActor
> Serialization stack:
> - object not serializable (class: com.ronglian.actor.UserActConsumerActor,
> value: com.ronglian.actor.UserActConsumerActor@a711490)
> - field (class: com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1,
> name: $outer, type: class com.ronglian.actor.UserActConsumerActor)
> - object (class com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1,
> <function1>)
> - field (class: 
> com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1$$anonfun$apply$1,
> name: $outer, type: class com.ronglian.actor.
> UserActConsumerActor$$anonfun$parse$1)
> - object (class 
> com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1$$anonfun$apply$1,
> <function1>)
> at org.apache.spark.serializer.SerializationDebugger$.improveException(
> SerializationDebugger.scala:40)
> at org.apache.spark.serializer.JavaSerializationStream.
> writeObject(JavaSerializer.scala:46)
> at org.apache.spark.serializer.JavaSerializerInstance.
> serialize(JavaSerializer.scala:100)
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
> ClosureCleaner.scala:295)
> ... 30 more
>
>
> I don't know how to serialize the UserActConsumerActor class.
> Any one give me suggestion?
>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to