Hi Rick,

The error message hints about providing an incorrect list of
parameters: constructor
public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible
with arguments [class java.lang.Class, class org.apache.spark.examples.
streaming.ActorWordCount$$anonfun$2]

Note the second parameter being a specific lambda expression, so something
about serializing lambdas.

The chances of getting a good answer may be better in a more Spark focused
forum (see here: https://spark.apache.org/community.html)

--
Johan
Akka Team

On Wed, Aug 31, 2016 at 12:56 AM, Ricky <[email protected]> wrote:

> I'm running Spark on CDH 5.5.2 which has a dependency on Akka 2.2.3.
>
> I'm attempting to create an actorStream in Spark, but during the creation
> of the actor I get the following exception.
>
> Caused by: java.lang.IllegalArgumentException: constructor public 
> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with 
> arguments [class java.lang.Class, class 
> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2] at 
> akka.util.Reflect$.instantiate(Reflect.scala:69)
>
> Which traces back to 
> https://github.com/akka/akka/blob/v2.2.3/akka-actor/src/main/scala/akka/actor/Props.scala#L337.
>
> Below is a code snippet of my code.
>
> class SampleActorReceiver(url: String) extends Actor with ActorHelper {
>
>   lazy private val remotePublisher = context.actorSelection(url)
>
>   override def preStart() {
>     remotePublisher ! SubscribeReceiver(context.self)
>   }
>
>   def receive = {
>      case msg => store(msg.asInstanceOf[String])
>    }
> }
>
>
> object ActorStreaming extends App {
>
>     val system = ActorSystem("ActorStreaming")
>     val feederRef = system.actorOf(Props[FeederActor], "FeederActor")
>
>     val sparkConf = new SparkConf().setAppName("Actor Streaming")
>     val ssc = new StreamingContext(sparkConf, Seconds(2))
>
>     val url = feederRef.path.toStringWithAddress(Address("akka.tcp",
> system.name, InetAddress.getLocalHost.getHostAddress, 2552))
>
>     val props = Props.create(classOf[SampleActorReceiver], url)
>
>     val lines = ssc.actorStream[String](props, "SampleReceiver")
>
>     lines.flatMap(_.split("\\s+")).map(x => (x,1)).reduceByKey( _ +
> _).print()
>
>     ssc.start()
>     ssc.awaitTermination()
>
>     system.awaitTermination()
>
> }
>
> There appears to be an issue with the Creator function being improperly
> serialized when spark sends the function from the driver to the receiver.
> Are there any workarounds, or is there something wrong with my code. Btw it
> works if I run it locally using spark submit, but fails if I submit it on
> the CDH cluster.
>
> Thanks in advance.
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM
Twitter: @akkateam

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to