Hi,
Let me quote your example codes:
var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
x <- dataframes
} {
val timeLen = time {
allDF = if (allDF == null) x else allDF.union(x)
val grouped = allDF.groupBy("cat1",
"cat2").agg(sum($"valToAdd").alias("v"))
val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
grouped2.show()
}
totalTime += timeLen
println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")
Basically what you do is to union some dataframes for each iteration, and do
aggregation on this union data. I don't see any reused operations.
1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
...
Your first example just does two aggregation operations. But your second
example like above does this aggregation operations for each iteration. So
the time of second example grows as the iteration increases.
-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context:
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]