[ https://issues.apache.org/jira/browse/SPARK-9141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15125491#comment-15125491 ]
Placek edited comment on SPARK-9141 at 2/2/16 1:21 AM: ------------------------------------------------------- Can it be that this problem still persists in spark 1.6? I have a program where I do a transitive closure and it visibly slows with every iteration. Also filtered.explain() suggests that everything is recomputed. The slow down occurs even if the input is define in such a way, that no rows get computer, i.e., filtered is always empty (after few initial iterations). (The code assumes that Edge is a simple case class with start and end) {code:java} val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 17), (17, 18), (18, 19), (19, 20), (20, 21), (21, 22), (22, 23), (23, 24), (24, 25), (25, 26), (26, 27), (27, 28), (23, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10), (10, 11), (11, 12), (12, 13)) //val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7)) var edges = sc .parallelize(e, N) .map(p => Edge(p._1, p._2)) .toDF() .cache() var filtered = edges .filter("start = 1") .distinct() .withColumnRenamed("start", "fStart") .withColumnRenamed("end", "fEnd") .cache() var i = 0 while (i < 300) { i = i + 1 println("\n i = " + i) filtered = filtered .join(edges, filtered("fEnd") === edges("start")) .select(filtered("fStart"), edges("end")) .withColumnRenamed("start", "fStart") .withColumnRenamed("end", "fEnd") .distinct .cache() filtered.show } {code} Also I sometimes get the following error, which I don't understand (and it is non-deterministic - I only get the error sometimes): {code:java} 16/01/31 21:33:52 ERROR Utils: Uncaught exception in thread driver-heartbeater java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207) at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219) at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.util.Utils$.deserialize(Utils.scala:92) ... {code} was (Author: zephod): Can it be that this problem still persists in spark 1.6? I have a program where I do a transitive closure and it visibly slows with every iteration. Also filtered.explain() suggests that everything is recomputed. The slow down occurs even if the input is define in such a way, that no rows get computer, i.e., filtered is always empty (after few initial iterations). {code:java} val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 17), (17, 18), (18, 19), (19, 20), (20, 21), (21, 22), (22, 23), (23, 24), (24, 25), (25, 26), (26, 27), (27, 28), (23, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10), (10, 11), (11, 12), (12, 13)) //val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7)) var edges = sc .parallelize(e, N) .map(p => Edge(p._1, p._2)) .toDF() .cache() var filtered = edges .filter("start = 1") .distinct() .withColumnRenamed("start", "fStart") .withColumnRenamed("end", "fEnd") .cache() var i = 0 while (i < 300) { i = i + 1 println("\n i = " + i) filtered = filtered .join(edges, filtered("fEnd") === edges("start")) .select(filtered("fStart"), edges("end")) .withColumnRenamed("start", "fStart") .withColumnRenamed("end", "fEnd") .distinct .cache() filtered.show } {code} Also I sometimes get the following error, which I don't understand (and it is non-deterministic - I only get the error sometimes): {code:java} 16/01/31 21:33:52 ERROR Utils: Uncaught exception in thread driver-heartbeater java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207) at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219) at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.util.Utils$.deserialize(Utils.scala:92) ... {code} > DataFrame recomputed instead of using cached parent. > ---------------------------------------------------- > > Key: SPARK-9141 > URL: https://issues.apache.org/jira/browse/SPARK-9141 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.4.0, 1.4.1 > Reporter: Nick Pritchard > Assignee: Michael Armbrust > Priority: Blocker > Labels: cache, dataframe > Fix For: 1.5.0 > > > As I understand, DataFrame.cache() is supposed to work the same as > RDD.cache(), so that repeated operations on it will use the cached results > and not recompute the entire lineage. However, it seems that some DataFrame > operations (e.g. withColumn) change the underlying RDD lineage so that cache > doesn't work as expected. > Below is a Scala example that demonstrates this. First, I define two UDF's > that use println so that it is easy to see when they are being called. Next, > I create a simple data frame with one row and two columns. Next, I add a > column, cache it, and call count() to force the computation. Lastly, I add > another column, cache it, and call count(). > I would have expected the last statement to only compute the last column, > since everything else was cached. However, because withColumn() changes the > lineage, the whole data frame is recomputed. > {code} > // Examples udf's that println when called > val twice = udf { (x: Int) => println(s"Computed: twice($x)"); x * 2 } > val triple = udf { (x: Int) => println(s"Computed: triple($x)"); x * 3 } > // Initial dataset > val df1 = sc.parallelize(Seq(("a", 1))).toDF("name", "value") > // Add column by applying twice udf > val df2 = df1.withColumn("twice", twice($"value")) > df2.cache() > df2.count() //prints Computed: twice(1) > // Add column by applying triple udf > val df3 = df2.withColumn("triple", triple($"value")) > df3.cache() > df3.count() //prints Computed: twice(1)\nComputed: triple(1) > {code} > I found a workaround, which helped me understand what was going on behind the > scenes, but doesn't seem like an ideal solution. Basically, I convert to RDD > then back DataFrame, which seems to freeze the lineage. The code below shows > the workaround for creating the second data frame so cache will work as > expected. > {code} > val df2 = { > val tmp = df1.withColumn("twice", twice($"value")) > sqlContext.createDataFrame(tmp.rdd, tmp.schema) > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org