my main file
/**
* Author: zhouhh
* Date : 2017-06-26
*
* main file,start web server, init akka ,spark streaming
*/
object WebServer extends Directives {
val log = Logger(LoggerFactory.getLogger("WebServer"))
log.info("==========enter WebServer init========")
//init ActorSystem
implicit val system = ActorSystem("recommed_System")
implicit val materializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor =
system.dispatcher
//init spark streaming
val sparkConf: SparkConf = new
SparkConf().setMaster(Config.sparkMaster).setAppName("WebUserActionConsumer")
val ssc: StreamingContext = new StreamingContext(sparkConf,
Seconds(Config.batchDuration))
//init routes
val routes: Route = BaseRoutes.baseRoutes ~
new RcmdRoutes().rcmdRoutes ~
SimpleRoutes.simpleRoutes
def main(args: Array[String]) {
log.debug("==========enter WebServer beginning========")
val userActConsumerActor = system.actorOf(Props(new
UserActConsumerActor(ssc)), name = "UserActConsumerActor")
userActConsumerActor ! UserActMsgConsumer
val bindingFuture = Http().bindAndHandle(routes, Config.host,
Config.port)
log.info(s"Server online at ${Config.host}:${Config.port}/\nPress
RETURN to stop...")
ssc.start()
ssc.awaitTermination()
log.info(s"Server online at ${Config.host}:${Config.port}/\nPress
RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
my spark streaming kafka consumer file
class UserActConsumerActor(ssc: StreamingContext) extends Actor {
val logger = Logger(LoggerFactory.getLogger("UserActConsumerActor"))
val userActMsgParseActor: ActorRef =
context.actorOf(Props[UserActMsgParseActor], name = "UserActMsgParseActor")
def receive = {
case UserActMsgConsumer => parse()
case e => logger.info(s"error receive of user act msg: $e")
}
val stream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
Subscribe[String, String](Config.useracttopics.split(","),
Config.kafkaParams)
)
def parse(): Unit = {
stream.foreachRDD(rdd => rdd.foreach(x => {
logger.info("======user act value:====\n")
logger.info(x.value()) //value is String
Try {
x.value().parseJson.convertTo[UserActMsg]
} match {
case Success(msg) => userActMsgParseActor ! msg
// case Success(msg) => context.parent ! msg
case Failure(x) => logger.error(s"$x") //println(s"$x") //
// case Success(msg) => println(msg)
}
}))
}
}
when i run webserver, it will report the error as the Title repeatly.
17/08/04 11:40:15 ERROR JobScheduler: Error running job streaming job
1501818015000 ms.0
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
at
com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1.apply(UserActConsumerActor.scala:46)
at
com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1.apply(UserActConsumerActor.scala:46)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException:
com.ronglian.actor.UserActConsumerActor
Serialization stack:
- object not serializable (class: com.ronglian.actor.UserActConsumerActor,
value: com.ronglian.actor.UserActConsumerActor@a711490)
- field (class: com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1,
name: $outer, type: class com.ronglian.actor.UserActConsumerActor)
- object (class com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1,
<function1>)
- field (class:
com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1$$anonfun$apply$1,
name: $outer, type: class
com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1)
- object (class
com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1$$anonfun$apply$1,
<function1>)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 30 more
I don't know how to serialize the UserActConsumerActor class.
Any one give me suggestion?
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.