[
https://issues.apache.org/jira/browse/SPARK-4171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14191736#comment-14191736
]
Shiti Saxena commented on SPARK-4171:
-------------------------------------
After applying the patch from https://github.com/apache/spark/pull/2158, I was
able to replicate the issue in the REPL as well,
{noformat}
Spark context available as sc.
scala> import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
scala> import akka.actor.{Actor,Props}
import akka.actor.{Actor, Props}
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._
scala> Seconds(1)
res0: org.apache.spark.streaming.Duration = 1000 ms
scala> val ssc= new StreamingContext(sc,res0)
ssc: org.apache.spark.streaming.StreamingContext =
org.apache.spark.streaming.StreamingContext@1b1bca6c
scala> :pas
// Entering paste mode (ctrl-D to finish)
class EchoActor extends Actor with ActorHelper {
override def receive = {
case message => sender ! message
}
}
object EchoActor {
def props: Props = Props(new EchoActor())
}
defined class EchoActor
defined module EchoActor
scala> ssc.actorStream[String](EchoActor.props, "TestActor")
res1: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] =
org.apache.spark.streaming.dstream.PluggableInputDStream@56a620b4
scala> res1.print()
scala> ssc.start()
14/10/31 16:52:48 INFO ReceiverTracker: ReceiverTracker started
14/10/31 16:52:48 INFO ForEachDStream: metadataCleanupDelay = -1
14/10/31 16:52:48 INFO PluggableInputDStream: metadataCleanupDelay = -1
14/10/31 16:52:48 INFO PluggableInputDStream: Slide time = 1000 ms
14/10/31 16:52:48 INFO PluggableInputDStream: Storage level =
StorageLevel(false, false, false, false, 1)
14/10/31 16:52:48 INFO PluggableInputDStream: Checkpoint interval = null
14/10/31 16:52:48 INFO PluggableInputDStream: Remember duration = 1000 ms
14/10/31 16:52:48 INFO PluggableInputDStream: Initialized and validated
org.apache.spark.streaming.dstream.PluggableInputDStream@56a620b4
14/10/31 16:52:48 INFO ForEachDStream: Slide time = 1000 ms
14/10/31 16:52:48 INFO ForEachDStream: Storage level = StorageLevel(false,
false, false, false, 1)
14/10/31 16:52:48 INFO ForEachDStream: Checkpoint interval = null
14/10/31 16:52:48 INFO ForEachDStream: Remember duration = 1000 ms
14/10/31 16:52:48 INFO ForEachDStream: Initialized and validated
org.apache.spark.streaming.dstream.ForEachDStream@4a5a796
14/10/31 16:52:48 INFO ReceiverTracker: Starting 1 receivers
14/10/31 16:52:48 INFO SparkContext: Starting job: runJob at
ReceiverTracker.scala:275
14/10/31 16:52:48 INFO DAGScheduler: Got job 0 (runJob at
ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
14/10/31 16:52:48 INFO DAGScheduler: Final stage: Stage 0(runJob at
ReceiverTracker.scala:275)
14/10/31 16:52:48 INFO DAGScheduler: Parents of final stage: List()
14/10/31 16:52:48 INFO DAGScheduler: Missing parents: List()
14/10/31 16:52:48 INFO DAGScheduler: Submitting Stage 0
(ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which has
no missing parents
14/10/31 16:52:48 INFO RecurringTimer: Started timer for JobGenerator at time
1414754569000
14/10/31 16:52:48 INFO JobGenerator: Started JobGenerator at 1414754569000 ms
14/10/31 16:52:48 INFO JobScheduler: Started JobScheduler
scala> 14/10/31 16:52:48 INFO MemoryStore: ensureFreeSpace(1216) called with
curMem=0, maxMem=278302556
14/10/31 16:52:48 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 1216.0 B, free 265.4 MB)
14/10/31 16:52:48 INFO TaskSchedulerImpl: Cancelling stage 0
14/10/31 16:52:48 INFO DAGScheduler: Failed to run runJob at
ReceiverTracker.scala:275
Exception in thread "Thread-38" org.apache.spark.SparkException: Job aborted
due to stage failure: Task not serializable: java.io.NotSerializableException:
$line19.$read$$iwC$$iwC$EchoActor$
- field (class "$line19.$read$$iwC$$iwC$EchoActor$$anonfun$props$1",
name: "$outer", type: "class $line19.$read$$iwC$$iwC$EchoActor$")
- object (class "$line19.$read$$iwC$$iwC$EchoActor$$anonfun$props$1",
<function0>)
- element of array (index: 1)
- array (class "[Ljava.lang.Object;", size: 32)
- field (class "scala.collection.immutable.Vector", name: "display0",
type: "class [Ljava.lang.Object;")
- object (class "scala.collection.immutable.Vector", Vector(class
$line19.$read$$iwC$$iwC$EchoActor, <function0>))
- field (class "akka.actor.Props", name: "args", type: "interface
scala.collection.immutable.Seq")
- object (class "akka.actor.Props",
Props(Deploy(,Config(SimpleConfigObject({})),NoRouter,NoScopeGiven,,),class
akka.actor.TypedCreatorFunctionConsumer,Vector(class
$line19.$read$$iwC$$iwC$EchoActor, <function0>)))
- field (class "org.apache.spark.streaming.receiver.ActorReceiver",
name: "org$apache$spark$streaming$receiver$ActorReceiver$$props", type: "class
akka.actor.Props")
- object (class "org.apache.spark.streaming.receiver.ActorReceiver",
org.apache.spark.streaming.receiver.ActorReceiver@6de87506)
- element of array (index: 0)
- array (class "[Lorg.apache.spark.streaming.receiver.Receiver;", size:
1)
- field (class "scala.collection.mutable.WrappedArray$ofRef", name:
"array", type: "class [Ljava.lang.Object;")
- object (class "scala.collection.mutable.WrappedArray$ofRef",
WrappedArray(org.apache.spark.streaming.receiver.ActorReceiver@6de87506))
- field (class "org.apache.spark.rdd.ParallelCollectionPartition",
name: "values", type: "interface scala.collection.Seq")
- custom writeObject data (class
"org.apache.spark.rdd.ParallelCollectionPartition")
- object (class "org.apache.spark.rdd.ParallelCollectionPartition",
org.apache.spark.rdd.ParallelCollectionPartition@691)
- field (class "org.apache.spark.scheduler.ResultTask", name:
"partition", type: "interface org.apache.spark.Partition")
- root object (class "org.apache.spark.scheduler.ResultTask",
ResultTask(0, 0))
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:870)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{noformat}
> StreamingContext.actorStream throws serializationError
> ------------------------------------------------------
>
> Key: SPARK-4171
> URL: https://issues.apache.org/jira/browse/SPARK-4171
> Project: Spark
> Issue Type: Bug
> Affects Versions: 1.1.0, 1.2.0
> Reporter: Shiti Saxena
>
> I encountered this issue when I was working on
> https://issues.apache.org/jira/browse/SPARK-3872.
> Running the following test case on v1.1.0 and the master
> branch(v1.2.0-SNAPSHOT) throws a serialization error.
> {noformat}
> test("actor input stream") {
> // Set up the streaming context and input streams
> val ssc = new StreamingContext(conf, batchDuration)
> val networkStream = ssc.actorStream[String](EchoActor.props, "TestActor",
> // Had to pass the local value of port to prevent from closing over
> entire scope
> StorageLevel.MEMORY_AND_DISK)
> println("created actor")
> networkStream.print()
> ssc.start()
> Thread.sleep(3 * 1000)
> println("started stream")
> Thread.sleep(3*1000)
> logInfo("Stopping server")
> logInfo("Stopping context")
> ssc.stop()
> }
> {noformat}
> where EchoActor is defined as
> {noformat}
> class EchoActor extends Actor with ActorHelper {
> override def receive = {
> case message => sender ! message
> }
> }
> object EchoActor {
> def props: Props = Props(new EchoActor())
> }
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]