my main file
/**
 * Author: zhouhh
 * Date  : 2017-06-26
 *
 * main file,start web server, init akka ,spark streaming
 */
object WebServer extends Directives {

  val log = Logger(LoggerFactory.getLogger("WebServer"))
  log.info("==========enter WebServer init========")

  //init ActorSystem
  implicit val system = ActorSystem("recommed_System")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = 
system.dispatcher

  //init spark streaming
  val sparkConf: SparkConf = new 
SparkConf().setMaster(Config.sparkMaster).setAppName("WebUserActionConsumer")
  val ssc: StreamingContext = new StreamingContext(sparkConf, 
Seconds(Config.batchDuration))

  //init routes
  val routes: Route = BaseRoutes.baseRoutes ~
    new RcmdRoutes().rcmdRoutes ~
    SimpleRoutes.simpleRoutes

  def main(args: Array[String]) {

    log.debug("==========enter WebServer beginning========")


    val userActConsumerActor = system.actorOf(Props(new 
UserActConsumerActor(ssc)), name = "UserActConsumerActor")

    userActConsumerActor ! UserActMsgConsumer


    val bindingFuture = Http().bindAndHandle(routes, Config.host, 
Config.port)

    log.info(s"Server online at ${Config.host}:${Config.port}/\nPress 
RETURN to stop...")

    ssc.start()
    ssc.awaitTermination()

    log.info(s"Server online at ${Config.host}:${Config.port}/\nPress 
RETURN to stop...")

    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done

  }


my spark streaming kafka consumer file

class UserActConsumerActor(ssc: StreamingContext) extends Actor {
  val logger = Logger(LoggerFactory.getLogger("UserActConsumerActor"))

  val userActMsgParseActor: ActorRef = 
context.actorOf(Props[UserActMsgParseActor], name = "UserActMsgParseActor")

  def receive = {

    case UserActMsgConsumer => parse()
    case e => logger.info(s"error receive of user act msg:  $e")
  }

  val stream: InputDStream[ConsumerRecord[String, String]] = 
KafkaUtils.createDirectStream(
    ssc,
    PreferConsistent,
    Subscribe[String, String](Config.useracttopics.split(","), 
Config.kafkaParams)
  )

  def parse(): Unit = {

    stream.foreachRDD(rdd => rdd.foreach(x => {
      logger.info("======user act value:====\n")
      logger.info(x.value()) //value is String

      Try {
        x.value().parseJson.convertTo[UserActMsg]
      } match {
        case Success(msg) => userActMsgParseActor ! msg
        //        case Success(msg) => context.parent ! msg
        case Failure(x) => logger.error(s"$x") //println(s"$x") //
        //        case Success(msg) => println(msg)

      }

    }))
  }

}


when i run webserver, it will report the error as the Title repeatly.

17/08/04 11:40:15 ERROR JobScheduler: Error running job streaming job 
1501818015000 ms.0
org.apache.spark.SparkException: Task not serializable
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
at 
com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1.apply(UserActConsumerActor.scala:46)
at 
com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1.apply(UserActConsumerActor.scala:46)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: 
com.ronglian.actor.UserActConsumerActor
Serialization stack:
- object not serializable (class: com.ronglian.actor.UserActConsumerActor, 
value: com.ronglian.actor.UserActConsumerActor@a711490)
- field (class: com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1, 
name: $outer, type: class com.ronglian.actor.UserActConsumerActor)
- object (class com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1, 
<function1>)
- field (class: 
com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1$$anonfun$apply$1, 
name: $outer, type: class 
com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1)
- object (class 
com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1$$anonfun$apply$1, 
<function1>)
at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 30 more


I don't know how to serialize the UserActConsumerActor class.
Any one give me suggestion?


-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to