[ 
https://issues.apache.org/jira/browse/SPARK-7960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317176#comment-15317176
 ] 

UTKARSH BHATNAGAR commented on SPARK-7960:
------------------------------------------

DEBUG Output:

16/06/06 20:29:00 DEBUG ClosureCleaner: +++ Cleaning closure <function1> 
(org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1) +++
16/06/06 20:29:00 DEBUG ClosureCleaner:  + declared fields: 2
16/06/06 20:29:00 DEBUG ClosureCleaner:      public static final long 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.serialVersionUID
16/06/06 20:29:00 DEBUG ClosureCleaner:      private final 
org.apache.spark.api.java.function.Function 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1
16/06/06 20:29:00 DEBUG ClosureCleaner:  + declared methods: 1
16/06/06 20:29:00 DEBUG ClosureCleaner:      public final java.lang.Object 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(java.lang.Object)
16/06/06 20:29:00 DEBUG ClosureCleaner:  + inner classes: 0
16/06/06 20:29:00 DEBUG ClosureCleaner:  + outer classes: 0
16/06/06 20:29:00 DEBUG ClosureCleaner:  + outer objects: 0
16/06/06 20:29:00 DEBUG ClosureCleaner:  + populating accessed fields because 
this is the starting closure
16/06/06 20:29:00 DEBUG ClosureCleaner:  + fields accessed by starting closure: 0
16/06/06 20:29:00 DEBUG ClosureCleaner:  + there are no enclosing objects!
16/06/06 20:29:00 DEBUG ClosureCleaner:  +++ closure <function1> 
(org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1) is now 
cleaned +++
16/06/06 20:29:00 DEBUG MappedDStream: MappedDStream.writeObject used
16/06/06 20:29:00 DEBUG JobGenerator: Got event DoCheckpoint(1465244940000 
ms,false)
16/06/06 20:29:00 ERROR JobScheduler: Error generating jobs for time 
1465244940000 ms
org.apache.spark.SparkException: Task not serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.map(RDD.scala:323)
        at 
org.apache.spark.streaming.dstream.MappedDStream$$anonfun$compute$1.apply(MappedDStream.scala:35)
        at 
org.apache.spark.streaming.dstream.MappedDStream$$anonfun$compute$1.apply(MappedDStream.scala:35)
        at scala.Option.map(Option.scala:145)
        at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at scala.Option.orElse(Option.scala:257)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
        at scala.util.Try$.apply(Try.scala:161)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.io.NotSerializableException: Object of 
org.apache.spark.streaming.dstream.MappedDStream is being serialized  possibly 
as a part of closure of an RDD operation. This is because  the DStream object 
is being referred to from within the closure.  Please rewrite the RDD operation 
inside this DStream to avoid this.  This has been enforced to avoid bloating of 
Spark tasks  with unnecessary objects.
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:536)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
        at 
org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:523)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        ... 41 more
Exception in thread "main" org.apache.spark.SparkException: Task not 
serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.map(RDD.scala:323)
        at 
org.apache.spark.streaming.dstream.MappedDStream$$anonfun$compute$1.apply(MappedDStream.scala:35)
        at 
org.apache.spark.streaming.dstream.MappedDStream$$anonfun$compute$1.apply(MappedDStream.scala:35)
        at scala.Option.map(Option.scala:145)
        at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at scala.Option.orElse(Option.scala:257)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
        at scala.util.Try$.apply(Try.scala:161)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.io.NotSerializableException: Object of 
org.apache.spark.streaming.dstream.MappedDStream is being serialized  possibly 
as a part of closure of an RDD operation. This is because  the DStream object 
is being referred to from within the closure.  Please rewrite the RDD operation 
inside this DStream to avoid this.  This has been enforced to avoid bloating of 
Spark tasks  with unnecessary objects.
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:536)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
        at 
org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:523)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        ... 41 more
16/06/06 20:29:00 INFO StreamingContext: Invoking stop(stopGracefully=false) 
from shutdown hook

> Serialization problem when multiple receivers are specified in a loop
> ---------------------------------------------------------------------
>
>                 Key: SPARK-7960
>                 URL: https://issues.apache.org/jira/browse/SPARK-7960
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.1, 1.4.0
>            Reporter: Nishkam Ravi
>
> The following code works fine:
>      var lines_array:Array[ReceiverInputDStream[String]] = new 
> Array[ReceiverInputDStream[String]](4);
>      lines_array(0) = ssc.actorStream[String](
>        Props(new 
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt))), "SampleReceiver0")
>      lines_array(1) = ssc.actorStream[String](
>        Props(new 
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt+1))), "SampleReceiver1")
>      lines_array(2) = ssc.actorStream[String](
>        Props(new 
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt+2))), "SampleReceiver2")
>      lines_array(3) = ssc.actorStream[String](
>        Props(new 
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>          host, port.toInt+3))), "SampleReceiver3")
> Fails when specified as a loop:
>      var lines_array:Array[ReceiverInputDStream[String]] = new 
> Array[ReceiverInputDStream[String]](4);
>      var i = 0;
>      for(i <- 0 to 3){
>        lines_array(i) = ssc.actorStream[String](
>          Props(new 
> SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
>            host, port.toInt + i))), "SampleReceiver" + i.toString)
>      }
> Exception stack trace:
> java.io.NotSerializableException: Object of 
> org.apache.spark.streaming.dstream.PluggableInputDStream is being serialized  
> possibly as a part of closure of an RDD operation. This is because  the 
> DStream object is being referred to from within the closure.  Please rewrite 
> the RDD operation inside this DStream to avoid this.  This has been enforced 
> to avoid bloating of Spark tasks  with unnecessary objects.
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:421)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1231)
>         at 
> org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:408)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at scala.collection.immutable.$colon$colon.writeObject(List.scala:379)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to