[
https://issues.apache.org/jira/browse/SPARK-9141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15125491#comment-15125491
]
Placek edited comment on SPARK-9141 at 1/31/16 8:47 PM:
--------------------------------------------------------
Can it be that this problem still persists in spark 1.6? I have a program where
I do a transitive closure and it visibly slows with every iteration. Also
filtered.explain() suggests that everything is recomputed. The slow down occurs
even if the input is define in such a way, that no rows get computer, i.e.,
filtered is empty (after few initial iterations).
{code:java}
val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9),
(9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 17),
(17, 18), (18, 19), (19, 20), (20, 21), (21, 22), (22, 23), (23, 24), (24, 25),
(25, 26), (26, 27), (27, 28), (23, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9),
(9, 10), (10, 11), (11, 12), (12, 13))
//val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7))
var edges = sc
.parallelize(e, N)
.map(p => Edge(p._1, p._2))
.toDF()
.cache()
var filtered = edges
.filter("start = 1")
.distinct()
.withColumnRenamed("start", "fStart")
.withColumnRenamed("end", "fEnd")
.cache()
var i = 0
while (i < 300) {
i = i + 1
println("\n i = " + i)
filtered = filtered
.join(edges, filtered("fEnd") === edges("start"))
.select(filtered("fStart"), edges("end"))
.withColumnRenamed("start", "fStart")
.withColumnRenamed("end", "fEnd")
.distinct
.cache()
filtered.show
}
{code}
Also I sometimes get the following error, which I don't understand (and it is
non-deterministic - I only get the error sometimes):
{code:java}
16/01/31 21:33:52 ERROR Utils: Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.HashMap$SerializationProxy to field
org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type
scala.collection.immutable.Map in instance of
org.apache.spark.executor.TaskMetrics
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
at
org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.util.Utils$.deserialize(Utils.scala:92)
...
{code}
was (Author: zephod):
Can it be that this problem still persists in spark 1.6? I have a program where
I do a transitive closure and it visibly slows with every iteration. The slow
down occurs even if the input is define in such a way, that no rows get
computer, i.e., filtered is empty (after few initial iterations).
{code:java}
val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9),
(9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 17),
(17, 18), (18, 19), (19, 20), (20, 21), (21, 22), (22, 23), (23, 24), (24, 25),
(25, 26), (26, 27), (27, 28), (23, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9),
(9, 10), (10, 11), (11, 12), (12, 13))
//val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7))
var edges = sc
.parallelize(e, N)
.map(p => Edge(p._1, p._2))
.toDF()
.cache()
var filtered = edges
.filter("start = 1")
.distinct()
.withColumnRenamed("start", "fStart")
.withColumnRenamed("end", "fEnd")
.cache()
var i = 0
while (i < 300) {
i = i + 1
println("\n i = " + i)
filtered = filtered
.join(edges, filtered("fEnd") === edges("start"))
.select(filtered("fStart"), edges("end"))
.withColumnRenamed("start", "fStart")
.withColumnRenamed("end", "fEnd")
.distinct
.cache()
filtered.show
}
{code}
Also I sometimes get the following error, which I don't understand (and it is
non-deterministic - I only get the error sometimes):
{code:java}
16/01/31 21:33:52 ERROR Utils: Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.HashMap$SerializationProxy to field
org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type
scala.collection.immutable.Map in instance of
org.apache.spark.executor.TaskMetrics
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
at
org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.util.Utils$.deserialize(Utils.scala:92)
...
{code}
> DataFrame recomputed instead of using cached parent.
> ----------------------------------------------------
>
> Key: SPARK-9141
> URL: https://issues.apache.org/jira/browse/SPARK-9141
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.4.0, 1.4.1
> Reporter: Nick Pritchard
> Assignee: Michael Armbrust
> Priority: Blocker
> Labels: cache, dataframe
> Fix For: 1.5.0
>
>
> As I understand, DataFrame.cache() is supposed to work the same as
> RDD.cache(), so that repeated operations on it will use the cached results
> and not recompute the entire lineage. However, it seems that some DataFrame
> operations (e.g. withColumn) change the underlying RDD lineage so that cache
> doesn't work as expected.
> Below is a Scala example that demonstrates this. First, I define two UDF's
> that use println so that it is easy to see when they are being called. Next,
> I create a simple data frame with one row and two columns. Next, I add a
> column, cache it, and call count() to force the computation. Lastly, I add
> another column, cache it, and call count().
> I would have expected the last statement to only compute the last column,
> since everything else was cached. However, because withColumn() changes the
> lineage, the whole data frame is recomputed.
> {code}
> // Examples udf's that println when called
> val twice = udf { (x: Int) => println(s"Computed: twice($x)"); x * 2 }
> val triple = udf { (x: Int) => println(s"Computed: triple($x)"); x * 3 }
> // Initial dataset
> val df1 = sc.parallelize(Seq(("a", 1))).toDF("name", "value")
> // Add column by applying twice udf
> val df2 = df1.withColumn("twice", twice($"value"))
> df2.cache()
> df2.count() //prints Computed: twice(1)
> // Add column by applying triple udf
> val df3 = df2.withColumn("triple", triple($"value"))
> df3.cache()
> df3.count() //prints Computed: twice(1)\nComputed: triple(1)
> {code}
> I found a workaround, which helped me understand what was going on behind the
> scenes, but doesn't seem like an ideal solution. Basically, I convert to RDD
> then back DataFrame, which seems to freeze the lineage. The code below shows
> the workaround for creating the second data frame so cache will work as
> expected.
> {code}
> val df2 = {
> val tmp = df1.withColumn("twice", twice($"value"))
> sqlContext.createDataFrame(tmp.rdd, tmp.schema)
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]