[ 
https://issues.apache.org/jira/browse/SPARK-4171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14191736#comment-14191736
 ] 

Shiti Saxena commented on SPARK-4171:
-------------------------------------

After applying the patch from https://github.com/apache/spark/pull/2158, I was 
able to replicate the issue in the REPL as well,

{noformat}
Spark context available as sc.

scala> import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}

scala> import akka.actor.{Actor,Props}
import akka.actor.{Actor, Props}

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> Seconds(1)
res0: org.apache.spark.streaming.Duration = 1000 ms

scala> val ssc= new StreamingContext(sc,res0)
ssc: org.apache.spark.streaming.StreamingContext = 
org.apache.spark.streaming.StreamingContext@1b1bca6c

scala> :pas
// Entering paste mode (ctrl-D to finish)

class EchoActor extends Actor with ActorHelper {
  override def receive = {
    case message => sender ! message
  }
}

object EchoActor {
  def props: Props = Props(new EchoActor())
}
defined class EchoActor
defined module EchoActor

scala> ssc.actorStream[String](EchoActor.props, "TestActor")
res1: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = 
org.apache.spark.streaming.dstream.PluggableInputDStream@56a620b4

scala> res1.print()

scala> ssc.start()
14/10/31 16:52:48 INFO ReceiverTracker: ReceiverTracker started
14/10/31 16:52:48 INFO ForEachDStream: metadataCleanupDelay = -1
14/10/31 16:52:48 INFO PluggableInputDStream: metadataCleanupDelay = -1
14/10/31 16:52:48 INFO PluggableInputDStream: Slide time = 1000 ms
14/10/31 16:52:48 INFO PluggableInputDStream: Storage level = 
StorageLevel(false, false, false, false, 1)
14/10/31 16:52:48 INFO PluggableInputDStream: Checkpoint interval = null
14/10/31 16:52:48 INFO PluggableInputDStream: Remember duration = 1000 ms
14/10/31 16:52:48 INFO PluggableInputDStream: Initialized and validated 
org.apache.spark.streaming.dstream.PluggableInputDStream@56a620b4
14/10/31 16:52:48 INFO ForEachDStream: Slide time = 1000 ms
14/10/31 16:52:48 INFO ForEachDStream: Storage level = StorageLevel(false, 
false, false, false, 1)
14/10/31 16:52:48 INFO ForEachDStream: Checkpoint interval = null
14/10/31 16:52:48 INFO ForEachDStream: Remember duration = 1000 ms
14/10/31 16:52:48 INFO ForEachDStream: Initialized and validated 
org.apache.spark.streaming.dstream.ForEachDStream@4a5a796
14/10/31 16:52:48 INFO ReceiverTracker: Starting 1 receivers
14/10/31 16:52:48 INFO SparkContext: Starting job: runJob at 
ReceiverTracker.scala:275
14/10/31 16:52:48 INFO DAGScheduler: Got job 0 (runJob at 
ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
14/10/31 16:52:48 INFO DAGScheduler: Final stage: Stage 0(runJob at 
ReceiverTracker.scala:275)
14/10/31 16:52:48 INFO DAGScheduler: Parents of final stage: List()
14/10/31 16:52:48 INFO DAGScheduler: Missing parents: List()
14/10/31 16:52:48 INFO DAGScheduler: Submitting Stage 0 
(ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which has 
no missing parents
14/10/31 16:52:48 INFO RecurringTimer: Started timer for JobGenerator at time 
1414754569000
14/10/31 16:52:48 INFO JobGenerator: Started JobGenerator at 1414754569000 ms
14/10/31 16:52:48 INFO JobScheduler: Started JobScheduler

scala> 14/10/31 16:52:48 INFO MemoryStore: ensureFreeSpace(1216) called with 
curMem=0, maxMem=278302556
14/10/31 16:52:48 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 1216.0 B, free 265.4 MB)
14/10/31 16:52:48 INFO TaskSchedulerImpl: Cancelling stage 0
14/10/31 16:52:48 INFO DAGScheduler: Failed to run runJob at 
ReceiverTracker.scala:275
Exception in thread "Thread-38" org.apache.spark.SparkException: Job aborted 
due to stage failure: Task not serializable: java.io.NotSerializableException: 
$line19.$read$$iwC$$iwC$EchoActor$
        - field (class "$line19.$read$$iwC$$iwC$EchoActor$$anonfun$props$1", 
name: "$outer", type: "class $line19.$read$$iwC$$iwC$EchoActor$")
        - object (class "$line19.$read$$iwC$$iwC$EchoActor$$anonfun$props$1", 
<function0>)
        - element of array (index: 1)
        - array (class "[Ljava.lang.Object;", size: 32)
        - field (class "scala.collection.immutable.Vector", name: "display0", 
type: "class [Ljava.lang.Object;")
        - object (class "scala.collection.immutable.Vector", Vector(class 
$line19.$read$$iwC$$iwC$EchoActor, <function0>))
        - field (class "akka.actor.Props", name: "args", type: "interface 
scala.collection.immutable.Seq")
        - object (class "akka.actor.Props", 
Props(Deploy(,Config(SimpleConfigObject({})),NoRouter,NoScopeGiven,,),class 
akka.actor.TypedCreatorFunctionConsumer,Vector(class 
$line19.$read$$iwC$$iwC$EchoActor, <function0>)))
        - field (class "org.apache.spark.streaming.receiver.ActorReceiver", 
name: "org$apache$spark$streaming$receiver$ActorReceiver$$props", type: "class 
akka.actor.Props")
        - object (class "org.apache.spark.streaming.receiver.ActorReceiver", 
org.apache.spark.streaming.receiver.ActorReceiver@6de87506)
        - element of array (index: 0)
        - array (class "[Lorg.apache.spark.streaming.receiver.Receiver;", size: 
1)
        - field (class "scala.collection.mutable.WrappedArray$ofRef", name: 
"array", type: "class [Ljava.lang.Object;")
        - object (class "scala.collection.mutable.WrappedArray$ofRef", 
WrappedArray(org.apache.spark.streaming.receiver.ActorReceiver@6de87506))
        - field (class "org.apache.spark.rdd.ParallelCollectionPartition", 
name: "values", type: "interface scala.collection.Seq")
        - custom writeObject data (class 
"org.apache.spark.rdd.ParallelCollectionPartition")
        - object (class "org.apache.spark.rdd.ParallelCollectionPartition", 
org.apache.spark.rdd.ParallelCollectionPartition@691)
        - field (class "org.apache.spark.scheduler.ResultTask", name: 
"partition", type: "interface org.apache.spark.Partition")
        - root object (class "org.apache.spark.scheduler.ResultTask", 
ResultTask(0, 0))
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:870)
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769)
        at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{noformat}

> StreamingContext.actorStream throws serializationError
> ------------------------------------------------------
>
>                 Key: SPARK-4171
>                 URL: https://issues.apache.org/jira/browse/SPARK-4171
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.1.0, 1.2.0
>            Reporter: Shiti Saxena
>
> I encountered this issue when I was working on 
> https://issues.apache.org/jira/browse/SPARK-3872.
> Running the following test case on v1.1.0 and the master 
> branch(v1.2.0-SNAPSHOT) throws a serialization error. 
> {noformat}
>  test("actor input stream") {
>     // Set up the streaming context and input streams
>     val ssc = new StreamingContext(conf, batchDuration)
>     val networkStream = ssc.actorStream[String](EchoActor.props, "TestActor",
>       // Had to pass the local value of port to prevent from closing over 
> entire scope
>       StorageLevel.MEMORY_AND_DISK)
>     println("created actor")
>     networkStream.print()
>     ssc.start()
>     Thread.sleep(3 * 1000)
>     println("started stream")
>     Thread.sleep(3*1000)
>     logInfo("Stopping server")
>     logInfo("Stopping context")
>     ssc.stop()
>   }
> {noformat}
> where EchoActor is defined as 
> {noformat}
> class EchoActor extends Actor with ActorHelper {
>   override def receive = {
>     case message => sender ! message
>   }
> }
> object EchoActor {
>   def props: Props = Props(new EchoActor())
> }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to