Hi Joseph, Thank you for the link! Two follow up questions 1)Suppose I have the original DataFrame in Tungsen, i.e. catalyst types and cached in off-heap store. It might be quite useful for iterative workloads due to lower GC overhead. Then I convert it to RDD and then backto DF. Will the resulting DF remain off-heap or it will be on heap as regular RDD? 2)How is the mentioned problem handled in GraphFrames? Suppose, I want to use aggregateMessages in the iterative loop, for implementing PageRank.
Best regards, Alexander From: Joseph Bradley [mailto:jos...@databricks.com] Sent: Friday, May 13, 2016 12:38 PM To: Ulanov, Alexander <alexander.ula...@hpe.com> Cc: dev@spark.apache.org Subject: Re: Shrinking the DataFrame lineage Here's a JIRA for it: https://issues.apache.org/jira/browse/SPARK-13346 I don't have a great method currently, but hacks can get around it: convert the DataFrame to an RDD and back to truncate the query plan lineage. Joseph On Wed, May 11, 2016 at 12:46 PM, Ulanov, Alexander <alexander.ula...@hpe.com<mailto:alexander.ula...@hpe.com>> wrote: Dear Spark developers, Recently, I was trying to switch my code from RDDs to DataFrames in order to compare the performance. The code computes RDD in a loop. I use RDD.persist followed by RDD.count to force Spark compute the RDD and cache it, so that it does not need to re-compute it on each iteration. However, it does not seem to work for DataFrame: import scala.util.Random val rdd = sc.parallelize(1 to 10, 2).map(x => (Random(5), Random(5)) val edges = sqlContext.createDataFrame(rdd).toDF("from", "to") val vertices = edges.select("from").unionAll(edges.select("to")).distinct().cache() vertices.count [Stage 34:=================> (65 + 4) / 200] [Stage 34:========================> (90 + 5) / 200] [Stage 34:==============================> (114 + 4) / 200] [Stage 34:====================================> (137 + 4) / 200] [Stage 34:==========================================> (157 + 4) / 200] [Stage 34:=================================================> (182 + 4) / 200] res25: Long = 5 If I run count again, it recomputes it again instead of using the cached result: scala> vertices.count [Stage 37:=============> (49 + 4) / 200] [Stage 37:==================> (66 + 4) / 200] [Stage 37:========================> (90 + 4) / 200] [Stage 37:=============================> (110 + 4) / 200] [Stage 37:===================================> (133 + 4) / 200] [Stage 37:==========================================> (157 + 4) / 200] [Stage 37:================================================> (178 + 5) / 200] res26: Long = 5 Could you suggest how to schrink the DataFrame lineage ? Best regards, Alexander