[ 
https://issues.apache.org/jira/browse/SPARK-7237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Wendell updated SPARK-7237:
-----------------------------------
    Component/s: Spark Core

> Many user provided closures are not actually cleaned
> ----------------------------------------------------
>
>                 Key: SPARK-7237
>                 URL: https://issues.apache.org/jira/browse/SPARK-7237
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.0
>            Reporter: Andrew Or
>            Assignee: Andrew Or
>
> It appears that many operations throughout Spark actually do not actually 
> clean the closures provided by the user.
> Simple reproduction:
> {code}
> def test(): Unit = {
>   sc.parallelize(1 to 10).mapPartitions { iter => return; iter }.collect()
> }
> {code}
> Clearly, the inner closure is not serializable, but when we serialize it we 
> should expect the ClosureCleaner to fail fast and complain loudly about 
> return statements. Instead, we get a mysterious stack trace:
> {code}
> java.io.NotSerializableException: java.lang.Object
> Serialization stack:
>       - object not serializable (class: java.lang.Object, value: 
> java.lang.Object@6db4b914)
>       - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: 
> nonLocalReturnKey1$1, type: class java.lang.Object)
>       - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)
>       - field (class: org.apache.spark.rdd.RDD$$anonfun$14, name: f$4, type: 
> interface scala.Function1)
>       - object (class org.apache.spark.rdd.RDD$$anonfun$14, <function3>)
>       at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:314)
> {code}
> What might have caused this? If you look at the code for mapPartitions, 
> you'll notice that we never explicitly clean the closure passed in by the 
> user. Instead, we only wrap it in another closure and clean only the outer 
> one:
> {code}
> def mapPartitions[U: ClassTag](
>       f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): 
> RDD[U] = {
>     val func = (context: TaskContext, index: Int, iter: Iterator[T]) => 
> f(iter)
>     new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
>   }
> {code}
> This is not sufficient, however, because the user provided closure is 
> actually a field of the outer closure, and this inner closure doesn't get 
> cleaned. If we rewrite the above by cleaning the inner closure preemptively, 
> as we have done in other places:
> {code}
> def mapPartitions[U: ClassTag](
>       f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): 
> RDD[U] = {
>     val cleanedFunc = clean(f)
>     new MapPartitionsRDD(
>       this,
>       (context: TaskContext, index: Int, iter: Iterator[T]) => 
> cleanedFunc(iter),
>       preservesPartitioning)
>   }
> {code}
> Then we get the exception that we would expect by running the test() example 
> above:
> {code}
> org.apache.spark.SparkException: Return statements aren't allowed in Spark 
> closures
>       at 
> org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTypeInsn(ClosureCleaner.scala:357)
>       at 
> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
>  Source)
>       at 
> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
>  Source)
>       at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:215)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:1759)
>       at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:640)
> {code}
> It seems to me that we simply forgot to do this in a few places (e.g. 
> mapPartitions, keyBy, aggregateByKey), because in other similar places we do 
> this correctly (e.g. groupBy, combineByKey, zipPartitions).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to