[ 
https://issues.apache.org/jira/browse/SPARK-9141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15125491#comment-15125491
 ] 

Placek edited comment on SPARK-9141 at 2/2/16 1:21 AM:
-------------------------------------------------------

Can it be that this problem still persists in spark 1.6? I have a program where 
I do a transitive closure and it visibly slows with every iteration. Also 
filtered.explain() suggests that everything is recomputed. The slow down occurs 
even if the input is define in such a way, that no rows get computer, i.e., 
filtered is always empty (after few initial iterations). (The code assumes that 
Edge is a simple case class with start and end)
{code:java}
    val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), 
(9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 17), 
(17, 18), (18, 19), (19, 20), (20, 21), (21, 22), (22, 23), (23, 24), (24, 25), 
(25, 26), (26, 27), (27, 28), (23, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), 
(9, 10), (10, 11), (11, 12), (12, 13))
    //val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7))
    var edges = sc
      .parallelize(e, N)
      .map(p => Edge(p._1, p._2))
      .toDF()
      .cache()
    var filtered = edges
      .filter("start = 1")
      .distinct()
      .withColumnRenamed("start", "fStart")
      .withColumnRenamed("end", "fEnd")
      .cache()

    var i = 0
    while (i < 300) {
      i = i + 1
      println("\n i = " + i)
      filtered = filtered
        .join(edges, filtered("fEnd") === edges("start"))
        .select(filtered("fStart"), edges("end"))
        .withColumnRenamed("start", "fStart")
        .withColumnRenamed("end", "fEnd")
        .distinct
        .cache()
      
      filtered.show
    }
{code}
Also I sometimes get the following error, which I don't understand (and it is 
non-deterministic - I only get the error sometimes):
{code:java}
16/01/31 21:33:52 ERROR Utils: Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
scala.collection.immutable.HashMap$SerializationProxy to field 
org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type 
scala.collection.immutable.Map in instance of 
org.apache.spark.executor.TaskMetrics
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
        at 
org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
        at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at org.apache.spark.util.Utils$.deserialize(Utils.scala:92)
...
{code}



was (Author: zephod):
Can it be that this problem still persists in spark 1.6? I have a program where 
I do a transitive closure and it visibly slows with every iteration. Also 
filtered.explain() suggests that everything is recomputed. The slow down occurs 
even if the input is define in such a way, that no rows get computer, i.e., 
filtered is always empty (after few initial iterations).
{code:java}
    val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), 
(9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 17), 
(17, 18), (18, 19), (19, 20), (20, 21), (21, 22), (22, 23), (23, 24), (24, 25), 
(25, 26), (26, 27), (27, 28), (23, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), 
(9, 10), (10, 11), (11, 12), (12, 13))
    //val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7))
    var edges = sc
      .parallelize(e, N)
      .map(p => Edge(p._1, p._2))
      .toDF()
      .cache()
    var filtered = edges
      .filter("start = 1")
      .distinct()
      .withColumnRenamed("start", "fStart")
      .withColumnRenamed("end", "fEnd")
      .cache()

    var i = 0
    while (i < 300) {
      i = i + 1
      println("\n i = " + i)
      filtered = filtered
        .join(edges, filtered("fEnd") === edges("start"))
        .select(filtered("fStart"), edges("end"))
        .withColumnRenamed("start", "fStart")
        .withColumnRenamed("end", "fEnd")
        .distinct
        .cache()
      
      filtered.show
    }
{code}
Also I sometimes get the following error, which I don't understand (and it is 
non-deterministic - I only get the error sometimes):
{code:java}
16/01/31 21:33:52 ERROR Utils: Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
scala.collection.immutable.HashMap$SerializationProxy to field 
org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type 
scala.collection.immutable.Map in instance of 
org.apache.spark.executor.TaskMetrics
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
        at 
org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
        at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at org.apache.spark.util.Utils$.deserialize(Utils.scala:92)
...
{code}


> DataFrame recomputed instead of using cached parent.
> ----------------------------------------------------
>
>                 Key: SPARK-9141
>                 URL: https://issues.apache.org/jira/browse/SPARK-9141
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.0, 1.4.1
>            Reporter: Nick Pritchard
>            Assignee: Michael Armbrust
>            Priority: Blocker
>              Labels: cache, dataframe
>             Fix For: 1.5.0
>
>
> As I understand, DataFrame.cache() is supposed to work the same as 
> RDD.cache(), so that repeated operations on it will use the cached results 
> and not recompute the entire lineage. However, it seems that some DataFrame 
> operations (e.g. withColumn) change the underlying RDD lineage so that cache 
> doesn't work as expected.
> Below is a Scala example that demonstrates this. First, I define two UDF's 
> that  use println so that it is easy to see when they are being called. Next, 
> I create a simple data frame with one row and two columns. Next, I add a 
> column, cache it, and call count() to force the computation. Lastly, I add 
> another column, cache it, and call count().
> I would have expected the last statement to only compute the last column, 
> since everything else was cached. However, because withColumn() changes the 
> lineage, the whole data frame is recomputed.
> {code}
>     // Examples udf's that println when called 
>     val twice = udf { (x: Int) => println(s"Computed: twice($x)"); x * 2 } 
>     val triple = udf { (x: Int) => println(s"Computed: triple($x)"); x * 3 } 
>     // Initial dataset 
>     val df1 = sc.parallelize(Seq(("a", 1))).toDF("name", "value") 
>     // Add column by applying twice udf 
>     val df2 = df1.withColumn("twice", twice($"value")) 
>     df2.cache() 
>     df2.count() //prints Computed: twice(1) 
>     // Add column by applying triple udf 
>     val df3 = df2.withColumn("triple", triple($"value")) 
>     df3.cache() 
>     df3.count() //prints Computed: twice(1)\nComputed: triple(1) 
> {code}
> I found a workaround, which helped me understand what was going on behind the 
> scenes, but doesn't seem like an ideal solution. Basically, I convert to RDD 
> then back DataFrame, which seems to freeze the lineage. The code below shows 
> the workaround for creating the second data frame so cache will work as 
> expected.
> {code}
>     val df2 = {
>       val tmp = df1.withColumn("twice", twice($"value"))
>       sqlContext.createDataFrame(tmp.rdd, tmp.schema)
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to