Hi Rick, The error message hints about providing an incorrect list of parameters: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples. streaming.ActorWordCount$$anonfun$2]
Note the second parameter being a specific lambda expression, so something about serializing lambdas. The chances of getting a good answer may be better in a more Spark focused forum (see here: https://spark.apache.org/community.html) -- Johan Akka Team On Wed, Aug 31, 2016 at 12:56 AM, Ricky <[email protected]> wrote: > I'm running Spark on CDH 5.5.2 which has a dependency on Akka 2.2.3. > > I'm attempting to create an actorStream in Spark, but during the creation > of the actor I get the following exception. > > Caused by: java.lang.IllegalArgumentException: constructor public > akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with > arguments [class java.lang.Class, class > org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2] at > akka.util.Reflect$.instantiate(Reflect.scala:69) > > Which traces back to > https://github.com/akka/akka/blob/v2.2.3/akka-actor/src/main/scala/akka/actor/Props.scala#L337. > > Below is a code snippet of my code. > > class SampleActorReceiver(url: String) extends Actor with ActorHelper { > > lazy private val remotePublisher = context.actorSelection(url) > > override def preStart() { > remotePublisher ! SubscribeReceiver(context.self) > } > > def receive = { > case msg => store(msg.asInstanceOf[String]) > } > } > > > object ActorStreaming extends App { > > val system = ActorSystem("ActorStreaming") > val feederRef = system.actorOf(Props[FeederActor], "FeederActor") > > val sparkConf = new SparkConf().setAppName("Actor Streaming") > val ssc = new StreamingContext(sparkConf, Seconds(2)) > > val url = feederRef.path.toStringWithAddress(Address("akka.tcp", > system.name, InetAddress.getLocalHost.getHostAddress, 2552)) > > val props = Props.create(classOf[SampleActorReceiver], url) > > val lines = ssc.actorStream[String](props, "SampleReceiver") > > lines.flatMap(_.split("\\s+")).map(x => (x,1)).reduceByKey( _ + > _).print() > > ssc.start() > ssc.awaitTermination() > > system.awaitTermination() > > } > > There appears to be an issue with the Creator function being improperly > serialized when spark sends the function from the driver to the receiver. > Are there any workarounds, or is there something wrong with my code. Btw it > works if I run it locally using spark submit, but fails if I submit it on > the CDH cluster. > > Thanks in advance. > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/ > current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Akka Team Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM Twitter: @akkateam -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
