Dear Spark developers,
Recently, I was trying to switch my code from RDDs to DataFrames in order to
compare the performance. The code computes RDD in a loop. I use RDD.persist
followed by RDD.count to force Spark compute the RDD and cache it, so that it
does not need to re-compute it on each iteration. However, it does not seem to
work for DataFrame:
import scala.util.Random
val rdd = sc.parallelize(1 to 10, 2).map(x => (Random(5), Random(5))
val edges = sqlContext.createDataFrame(rdd).toDF("from", "to")
val vertices =
edges.select("from").unionAll(edges.select("to")).distinct().cache()
vertices.count
[Stage 34:=================> (65 + 4) / 200]
[Stage 34:========================> (90 + 5) / 200]
[Stage 34:==============================> (114 + 4) / 200]
[Stage 34:====================================> (137 + 4) / 200]
[Stage 34:==========================================> (157 + 4) / 200]
[Stage 34:=================================================> (182 + 4) / 200]
res25: Long = 5
If I run count again, it recomputes it again instead of using the cached result:
scala> vertices.count
[Stage 37:=============> (49 + 4) / 200]
[Stage 37:==================> (66 + 4) / 200]
[Stage 37:========================> (90 + 4) / 200]
[Stage 37:=============================> (110 + 4) / 200]
[Stage 37:===================================> (133 + 4) / 200]
[Stage 37:==========================================> (157 + 4) / 200]
[Stage 37:================================================> (178 + 5) / 200]
res26: Long = 5
Could you suggest how to schrink the DataFrame lineage ?
Best regards, Alexander