[
https://issues.apache.org/jira/browse/SPARK-7960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14565956#comment-14565956
]
Sean Owen commented on SPARK-7960:
----------------------------------
Yeah I've seen problems of this form, where the closure captures things you
don't expect (or shouldn't) when code is written one way versus another. This
might be resolved by various updates to the closure cleaner recently, so I
think this is probably a duplicate of other issues rather than special to
streaming.
> Serialization problem when multiple receivers are specified in a loop
> ---------------------------------------------------------------------
>
> Key: SPARK-7960
> URL: https://issues.apache.org/jira/browse/SPARK-7960
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.3.1, 1.4.0
> Reporter: Nishkam Ravi
>
> The following code works fine:
> var lines_array:Array[ReceiverInputDStream[String]] = new
> Array[ReceiverInputDStream[String]](4);
> lines_array(0) = ssc.actorStream[String](
> Props(new
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
> host, port.toInt))), "SampleReceiver0")
> lines_array(1) = ssc.actorStream[String](
> Props(new
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
> host, port.toInt+1))), "SampleReceiver1")
> lines_array(2) = ssc.actorStream[String](
> Props(new
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
> host, port.toInt+2))), "SampleReceiver2")
> lines_array(3) = ssc.actorStream[String](
> Props(new
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
> host, port.toInt+3))), "SampleReceiver3")
> Fails when specified as a loop:
> var lines_array:Array[ReceiverInputDStream[String]] = new
> Array[ReceiverInputDStream[String]](4);
> var i = 0;
> for(i <- 0 to 3){
> lines_array(i) = ssc.actorStream[String](
> Props(new
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
> host, port.toInt + i))), "SampleReceiver" + i.toString)
> }
> Exception stack trace:
> java.io.NotSerializableException: Object of
> org.apache.spark.streaming.dstream.PluggableInputDStream is being serialized
> possibly as a part of closure of an RDD operation. This is because the
> DStream object is being referred to from within the closure. Please rewrite
> the RDD operation inside this DStream to avoid this. This has been enforced
> to avoid bloating of Spark tasks with unnecessary objects.
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:421)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1231)
> at
> org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:408)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at scala.collection.immutable.$colon$colon.writeObject(List.scala:379)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]