[
https://issues.apache.org/jira/browse/SPARK-1785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tathagata Das closed SPARK-1785.
--------------------------------
Resolution: Fixed
> Streaming requires receivers to be serializable
> -----------------------------------------------
>
> Key: SPARK-1785
> URL: https://issues.apache.org/jira/browse/SPARK-1785
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 0.9.0
> Reporter: Hari Shreedharan
>
> When the ReceiverTracker starts the receivers it creates a temporary RDD to
> send the receivers over to the workers. Then they are started on the workers
> using a the startReceivers method.
> Looks like this means that the receivers have to really be serializable. In
> case of the Flume receiver, the Avro IPC components are not serializable
> causing an error that looks like this:
> {code}
> Exception in thread "Thread-46" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task not serializable:
> java.io.NotSerializableException:
> org.apache.avro.ipc.specific.SpecificResponder
> - field (class "org.apache.spark.streaming.flume.FlumeReceiver", name:
> "responder", type: "class org.apache.avro.ipc.specific.SpecificResponder")
> - object (class "org.apache.spark.streaming.flume.FlumeReceiver",
> org.apache.spark.streaming.flume.FlumeReceiver@5e6bbb36)
> - element of array (index: 0)
> - array (class "[Lorg.apache.spark.streaming.receiver.Receiver;", size:
> 1)
> - field (class "scala.collection.mutable.WrappedArray$ofRef", name:
> "array", type: "class [Ljava.lang.Object;")
> - object (class "scala.collection.mutable.WrappedArray$ofRef",
> WrappedArray(org.apache.spark.streaming.flume.FlumeReceiver@5e6bbb36))
> - field (class "org.apache.spark.rdd.ParallelCollectionPartition",
> name: "values", type: "interface scala.collection.Seq")
> - custom writeObject data (class
> "org.apache.spark.rdd.ParallelCollectionPartition")
> - object (class "org.apache.spark.rdd.ParallelCollectionPartition",
> org.apache.spark.rdd.ParallelCollectionPartition@691)
> - writeExternal data
> - root object (class "org.apache.spark.scheduler.ResultTask",
> ResultTask(0, 0))
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> A way out of this is to simply send the class name (or .class) to the workers
> in the tempRDD and have the workers instantiate and start the receiver.
> My analysis maybe wrong. but if it makes sense, I will submit a PR to fix
> this.
--
This message was sent by Atlassian JIRA
(v6.2#6252)