[ 
https://issues.apache.org/jira/browse/SPARK-7237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Or updated SPARK-7237:
-----------------------------
    Description: 
It appears that many operations throughout Spark actually do not actually clean 
the closures provided by the user.

Simple reproduction:
{code}
def test(): Unit = {
  sc.parallelize(1 to 10).mapPartitions { iter => return; iter }.collect()
}
{code}
Clearly, the inner closure is not serializable, but when we serialize it we 
should expect the ClosureCleaner to fail fast and complain loudly about return 
statements. Instead, we get a mysterious stack trace:
{code}
java.io.NotSerializableException: java.lang.Object
Serialization stack:
        - object not serializable (class: java.lang.Object, value: 
java.lang.Object@6db4b914)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: 
nonLocalReturnKey1$1, type: class java.lang.Object)
        - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)
        - field (class: org.apache.spark.rdd.RDD$$anonfun$14, name: f$4, type: 
interface scala.Function1)
        - object (class org.apache.spark.rdd.RDD$$anonfun$14, <function3>)
        at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:314)
{code}

What might have caused this? If you look at the code for mapPartitions, you'll 
notice that we never explicitly clean the closure passed in by the user. 
Instead, we only wrap it in another closure and clean only the outer one:
{code}
def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): 
RDD[U] = {
    val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
    new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
  }
{code}

This is not sufficient, however, because the user provided closure is actually 
a field of the outer closure, and this inner closure doesn't get cleaned. If we 
rewrite the above by cleaning the inner closure preemptively, as we have done 
in other places:

{code}
def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): 
RDD[U] = {
    val cleanedFunc = clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => 
cleanedFunc(iter),
      preservesPartitioning)
  }
{code}

Then we get the exception that we would expect by running the test() example 
above:
{code}
org.apache.spark.SparkException: Return statements aren't allowed in Spark 
closures
        at 
org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTypeInsn(ClosureCleaner.scala:357)
        at 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
 Source)
        at 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
 Source)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:215)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1759)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:640)
{code}

It seems to me that we simply forgot to do this in a few places (e.g. 
mapPartitions, keyBy, aggregateByKey), because in other similar places we do 
this correctly (e.g. groupBy, combineByKey, zipPartitions).

  was:
It appears that many operations throughout Spark actually do not actually clean 
the closures provided by the user.

Simple reproduction:
{code}
def test(): Unit = {
  sc.parallelize(1 to 10).mapPartitions { iter => return; iter }.collect()
}
{code}
Clearly, the inner closure is not serializable, but when we serialize it we 
should expect the ClosureCleaner to fail fast and complain loudly about return 
statements. Instead, we get a mysterious stack trace:
{code}
java.io.NotSerializableException: java.lang.Object
Serialization stack:
        - object not serializable (class: java.lang.Object, value: 
java.lang.Object@6db4b914)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: 
nonLocalReturnKey1$1, type: class java.lang.Object)
        - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)
        - field (class: org.apache.spark.rdd.RDD$$anonfun$14, name: f$4, type: 
interface scala.Function1)
        - object (class org.apache.spark.rdd.RDD$$anonfun$14, <function3>)
        at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:314)
{code}

What might have caused this? If you look at the code for mapPartitions, you'll 
notice that we never explicitly clean the closure passed in by the user. 
Instead, we only wrap it in another closure and clean only the outer one:
{code}
def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): 
RDD[U] = {
    val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
    new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
  }
{code}

This is not sufficient, however, because the user provided closure is actually 
a field of the outer closure, and this inner closure doesn't get cleaned. If we 
rewrite the above by cleaning the inner closure preemptively:

{code}
def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): 
RDD[U] = {
    val cleanedFunc = clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => 
cleanedFunc(iter),
      preservesPartitioning)
  }
{code}

Then we get the exception that we would expect by running the test() example 
above:
{code}
org.apache.spark.SparkException: Return statements aren't allowed in Spark 
closures
        at 
org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTypeInsn(ClosureCleaner.scala:357)
        at 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
 Source)
        at 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
 Source)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:215)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1759)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:640)
{code}

This needs to be done in a few places throughout the Spark.


> Many user provided closures are not actually cleaned
> ----------------------------------------------------
>
>                 Key: SPARK-7237
>                 URL: https://issues.apache.org/jira/browse/SPARK-7237
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.0.0
>            Reporter: Andrew Or
>            Assignee: Andrew Or
>
> It appears that many operations throughout Spark actually do not actually 
> clean the closures provided by the user.
> Simple reproduction:
> {code}
> def test(): Unit = {
>   sc.parallelize(1 to 10).mapPartitions { iter => return; iter }.collect()
> }
> {code}
> Clearly, the inner closure is not serializable, but when we serialize it we 
> should expect the ClosureCleaner to fail fast and complain loudly about 
> return statements. Instead, we get a mysterious stack trace:
> {code}
> java.io.NotSerializableException: java.lang.Object
> Serialization stack:
>       - object not serializable (class: java.lang.Object, value: 
> java.lang.Object@6db4b914)
>       - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: 
> nonLocalReturnKey1$1, type: class java.lang.Object)
>       - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)
>       - field (class: org.apache.spark.rdd.RDD$$anonfun$14, name: f$4, type: 
> interface scala.Function1)
>       - object (class org.apache.spark.rdd.RDD$$anonfun$14, <function3>)
>       at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:314)
> {code}
> What might have caused this? If you look at the code for mapPartitions, 
> you'll notice that we never explicitly clean the closure passed in by the 
> user. Instead, we only wrap it in another closure and clean only the outer 
> one:
> {code}
> def mapPartitions[U: ClassTag](
>       f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): 
> RDD[U] = {
>     val func = (context: TaskContext, index: Int, iter: Iterator[T]) => 
> f(iter)
>     new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
>   }
> {code}
> This is not sufficient, however, because the user provided closure is 
> actually a field of the outer closure, and this inner closure doesn't get 
> cleaned. If we rewrite the above by cleaning the inner closure preemptively, 
> as we have done in other places:
> {code}
> def mapPartitions[U: ClassTag](
>       f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): 
> RDD[U] = {
>     val cleanedFunc = clean(f)
>     new MapPartitionsRDD(
>       this,
>       (context: TaskContext, index: Int, iter: Iterator[T]) => 
> cleanedFunc(iter),
>       preservesPartitioning)
>   }
> {code}
> Then we get the exception that we would expect by running the test() example 
> above:
> {code}
> org.apache.spark.SparkException: Return statements aren't allowed in Spark 
> closures
>       at 
> org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTypeInsn(ClosureCleaner.scala:357)
>       at 
> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
>  Source)
>       at 
> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
>  Source)
>       at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:215)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:1759)
>       at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:640)
> {code}
> It seems to me that we simply forgot to do this in a few places (e.g. 
> mapPartitions, keyBy, aggregateByKey), because in other similar places we do 
> this correctly (e.g. groupBy, combineByKey, zipPartitions).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to