[ 
https://issues.apache.org/jira/browse/SPARK-7960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14565956#comment-14565956
 ] 

Sean Owen commented on SPARK-7960:
----------------------------------

Yeah I've seen problems of this form, where the closure captures things you 
don't expect (or shouldn't) when code is written one way versus another. This 
might be resolved by various updates to the closure cleaner recently, so I 
think this is probably a duplicate of other issues rather than special to 
streaming.

> Serialization problem when multiple receivers are specified in a loop
> ---------------------------------------------------------------------
>
>                 Key: SPARK-7960
>                 URL: https://issues.apache.org/jira/browse/SPARK-7960
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.1, 1.4.0
>            Reporter: Nishkam Ravi
>
> The following code works fine:
>      var lines_array:Array[ReceiverInputDStream[String]] = new 
> Array[ReceiverInputDStream[String]](4);
>      lines_array(0) = ssc.actorStream[String](
>        Props(new 
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt))), "SampleReceiver0")
>      lines_array(1) = ssc.actorStream[String](
>        Props(new 
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt+1))), "SampleReceiver1")
>      lines_array(2) = ssc.actorStream[String](
>        Props(new 
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt+2))), "SampleReceiver2")
>      lines_array(3) = ssc.actorStream[String](
>        Props(new 
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt+3))), "SampleReceiver3")
> Fails when specified as a loop:
>      var lines_array:Array[ReceiverInputDStream[String]] = new 
> Array[ReceiverInputDStream[String]](4);
>      var i = 0;
>      for(i <- 0 to 3){
>        lines_array(i) = ssc.actorStream[String](
>          Props(new 
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>            host, port.toInt + i))), "SampleReceiver" + i.toString)
>      }
> Exception stack trace:
> java.io.NotSerializableException: Object of 
> org.apache.spark.streaming.dstream.PluggableInputDStream is being serialized  
> possibly as a part of closure of an RDD operation. This is because  the 
> DStream object is being referred to from within the closure.  Please rewrite 
> the RDD operation inside this DStream to avoid this.  This has been enforced 
> to avoid bloating of Spark tasks  with unnecessary objects.
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:421)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1231)
>         at 
> org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:408)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at scala.collection.immutable.$colon$colon.writeObject(List.scala:379)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to