Hi Joseph,

Thank you for the link! Two follow up questions
1)Suppose I have the original DataFrame in Tungsen, i.e. catalyst types and 
cached in off-heap store. It might be quite useful for iterative workloads due 
to lower GC overhead. Then I convert it to RDD and then backto DF. Will the 
resulting DF remain off-heap or it will be on heap as regular RDD?
2)How is the mentioned problem handled in GraphFrames? Suppose, I want to use 
aggregateMessages in the iterative loop, for implementing PageRank.

Best regards, Alexander

From: Joseph Bradley [mailto:jos...@databricks.com]
Sent: Friday, May 13, 2016 12:38 PM
To: Ulanov, Alexander <alexander.ula...@hpe.com>
Cc: dev@spark.apache.org
Subject: Re: Shrinking the DataFrame lineage

Here's a JIRA for it: https://issues.apache.org/jira/browse/SPARK-13346

I don't have a great method currently, but hacks can get around it: convert the 
DataFrame to an RDD and back to truncate the query plan lineage.

Joseph

On Wed, May 11, 2016 at 12:46 PM, Ulanov, Alexander 
<alexander.ula...@hpe.com<mailto:alexander.ula...@hpe.com>> wrote:
Dear Spark developers,

Recently, I was trying to switch my code from RDDs to DataFrames in order to 
compare the performance. The code computes RDD in a loop. I use RDD.persist 
followed by RDD.count to force Spark compute the RDD and cache it, so that it 
does not need to re-compute it on each iteration. However, it does not seem to 
work for DataFrame:

import scala.util.Random
val rdd = sc.parallelize(1 to 10, 2).map(x => (Random(5), Random(5))
val edges = sqlContext.createDataFrame(rdd).toDF("from", "to")
val vertices = 
edges.select("from").unionAll(edges.select("to")).distinct().cache()
vertices.count
[Stage 34:=================>                                     (65 + 4) / 200]
[Stage 34:========================>                              (90 + 5) / 200]
[Stage 34:==============================>                       (114 + 4) / 200]
[Stage 34:====================================>                 (137 + 4) / 200]
[Stage 34:==========================================>           (157 + 4) / 200]
[Stage 34:=================================================>    (182 + 4) / 200]

res25: Long = 5
If I run count again, it recomputes it again instead of using the cached result:
scala> vertices.count
[Stage 37:=============>                                         (49 + 4) / 200]
[Stage 37:==================>                                    (66 + 4) / 200]
[Stage 37:========================>                              (90 + 4) / 200]
[Stage 37:=============================>                        (110 + 4) / 200]
[Stage 37:===================================>                  (133 + 4) / 200]
[Stage 37:==========================================>           (157 + 4) / 200]
[Stage 37:================================================>     (178 + 5) / 200]
res26: Long = 5

Could you suggest how to schrink the DataFrame lineage ?

Best regards, Alexander

Reply via email to