Hari Shreedharan created SPARK-1785:
---------------------------------------

             Summary: Streaming requires receivers to be serializable
                 Key: SPARK-1785
                 URL: https://issues.apache.org/jira/browse/SPARK-1785
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 0.9.0
            Reporter: Hari Shreedharan


When the ReceiverTracker starts the receivers it creates a temporary RDD to  
send the receivers over to the workers. Then they are started on the workers  
using a the startReceivers method.

Looks like this means that the receivers have to really be serializable. In 
case of the Flume receiver, the Avro IPC components are not serializable 
causing an error that looks like this:
{code}
Exception in thread "Thread-46" org.apache.spark.SparkException: Job aborted 
due to stage failure: Task not serializable: java.io.NotSerializableException: 
org.apache.avro.ipc.specific.SpecificResponder
        - field (class "org.apache.spark.streaming.flume.FlumeReceiver", name: 
"responder", type: "class org.apache.avro.ipc.specific.SpecificResponder")
        - object (class "org.apache.spark.streaming.flume.FlumeReceiver", 
org.apache.spark.streaming.flume.FlumeReceiver@5e6bbb36)
        - element of array (index: 0)
        - array (class "[Lorg.apache.spark.streaming.receiver.Receiver;", size: 
1)
        - field (class "scala.collection.mutable.WrappedArray$ofRef", name: 
"array", type: "class [Ljava.lang.Object;")
        - object (class "scala.collection.mutable.WrappedArray$ofRef", 
WrappedArray(org.apache.spark.streaming.flume.FlumeReceiver@5e6bbb36))
        - field (class "org.apache.spark.rdd.ParallelCollectionPartition", 
name: "values", type: "interface scala.collection.Seq")
        - custom writeObject data (class 
"org.apache.spark.rdd.ParallelCollectionPartition")
        - object (class "org.apache.spark.rdd.ParallelCollectionPartition", 
org.apache.spark.rdd.ParallelCollectionPartition@691)
        - writeExternal data
        - root object (class "org.apache.spark.scheduler.ResultTask", 
ResultTask(0, 0))
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
        at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

A way out of this is to simply send the class name (or .class) to the workers 
in the tempRDD and have the workers instantiate and start the receiver.

My analysis maybe wrong. but if it makes sense, I will submit a PR to fix this.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to