[ https://issues.apache.org/jira/browse/SPARK-7237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Patrick Wendell updated SPARK-7237: ----------------------------------- Component/s: Spark Core > Many user provided closures are not actually cleaned > ---------------------------------------------------- > > Key: SPARK-7237 > URL: https://issues.apache.org/jira/browse/SPARK-7237 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.0.0 > Reporter: Andrew Or > Assignee: Andrew Or > > It appears that many operations throughout Spark actually do not actually > clean the closures provided by the user. > Simple reproduction: > {code} > def test(): Unit = { > sc.parallelize(1 to 10).mapPartitions { iter => return; iter }.collect() > } > {code} > Clearly, the inner closure is not serializable, but when we serialize it we > should expect the ClosureCleaner to fail fast and complain loudly about > return statements. Instead, we get a mysterious stack trace: > {code} > java.io.NotSerializableException: java.lang.Object > Serialization stack: > - object not serializable (class: java.lang.Object, value: > java.lang.Object@6db4b914) > - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: > nonLocalReturnKey1$1, type: class java.lang.Object) > - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>) > - field (class: org.apache.spark.rdd.RDD$$anonfun$14, name: f$4, type: > interface scala.Function1) > - object (class org.apache.spark.rdd.RDD$$anonfun$14, <function3>) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:314) > {code} > What might have caused this? If you look at the code for mapPartitions, > you'll notice that we never explicitly clean the closure passed in by the > user. Instead, we only wrap it in another closure and clean only the outer > one: > {code} > def mapPartitions[U: ClassTag]( > f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): > RDD[U] = { > val func = (context: TaskContext, index: Int, iter: Iterator[T]) => > f(iter) > new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) > } > {code} > This is not sufficient, however, because the user provided closure is > actually a field of the outer closure, and this inner closure doesn't get > cleaned. If we rewrite the above by cleaning the inner closure preemptively, > as we have done in other places: > {code} > def mapPartitions[U: ClassTag]( > f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): > RDD[U] = { > val cleanedFunc = clean(f) > new MapPartitionsRDD( > this, > (context: TaskContext, index: Int, iter: Iterator[T]) => > cleanedFunc(iter), > preservesPartitioning) > } > {code} > Then we get the exception that we would expect by running the test() example > above: > {code} > org.apache.spark.SparkException: Return statements aren't allowed in Spark > closures > at > org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTypeInsn(ClosureCleaner.scala:357) > at > com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown > Source) > at > com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown > Source) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:215) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) > at org.apache.spark.SparkContext.clean(SparkContext.scala:1759) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:640) > {code} > It seems to me that we simply forgot to do this in a few places (e.g. > mapPartitions, keyBy, aggregateByKey), because in other similar places we do > this correctly (e.g. groupBy, combineByKey, zipPartitions). -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org