[
https://issues.apache.org/jira/browse/SPARK-7237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andrew Or closed SPARK-7237.
----------------------------
Resolution: Fixed
Fix Version/s: 1.4.0
> Many user provided closures are not actually cleaned
> ----------------------------------------------------
>
> Key: SPARK-7237
> URL: https://issues.apache.org/jira/browse/SPARK-7237
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.0.0
> Reporter: Andrew Or
> Assignee: Andrew Or
> Fix For: 1.4.0
>
>
> It appears that many operations throughout Spark actually do not actually
> clean the closures provided by the user.
> Simple reproduction:
> {code}
> def test(): Unit = {
> sc.parallelize(1 to 10).mapPartitions { iter => return; iter }.collect()
> }
> {code}
> Clearly, the inner closure is not serializable, but when we serialize it we
> should expect the ClosureCleaner to fail fast and complain loudly about
> return statements. Instead, we get a mysterious stack trace:
> {code}
> java.io.NotSerializableException: java.lang.Object
> Serialization stack:
> - object not serializable (class: java.lang.Object, value:
> java.lang.Object@6db4b914)
> - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name:
> nonLocalReturnKey1$1, type: class java.lang.Object)
> - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)
> - field (class: org.apache.spark.rdd.RDD$$anonfun$14, name: f$4, type:
> interface scala.Function1)
> - object (class org.apache.spark.rdd.RDD$$anonfun$14, <function3>)
> at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:314)
> {code}
> What might have caused this? If you look at the code for mapPartitions,
> you'll notice that we never explicitly clean the closure passed in by the
> user. Instead, we only wrap it in another closure and clean only the outer
> one:
> {code}
> def mapPartitions[U: ClassTag](
> f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false):
> RDD[U] = {
> val func = (context: TaskContext, index: Int, iter: Iterator[T]) =>
> f(iter)
> new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
> }
> {code}
> This is not sufficient, however, because the user provided closure is
> actually a field of the outer closure, and this inner closure doesn't get
> cleaned. If we rewrite the above by cleaning the inner closure preemptively,
> as we have done in other places:
> {code}
> def mapPartitions[U: ClassTag](
> f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false):
> RDD[U] = {
> val cleanedFunc = clean(f)
> new MapPartitionsRDD(
> this,
> (context: TaskContext, index: Int, iter: Iterator[T]) =>
> cleanedFunc(iter),
> preservesPartitioning)
> }
> {code}
> Then we get the exception that we would expect by running the test() example
> above:
> {code}
> org.apache.spark.SparkException: Return statements aren't allowed in Spark
> closures
> at
> org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTypeInsn(ClosureCleaner.scala:357)
> at
> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
> Source)
> at
> com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
> Source)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:215)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:1759)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:640)
> {code}
> It seems to me that we simply forgot to do this in a few places (e.g.
> mapPartitions, keyBy, aggregateByKey), because in other similar places we do
> this correctly (e.g. groupBy, combineByKey, zipPartitions).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]