Hi,

Let me quote your example codes:

var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
  x <- dataframes
} {
  val timeLen = time {
    allDF = if (allDF == null) x else allDF.union(x)
    val grouped = allDF.groupBy("cat1",
"cat2").agg(sum($"valToAdd").alias("v"))
    val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
    grouped2.show()
  }
  totalTime += timeLen
  println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")


Basically what you do is to union some dataframes for each iteration, and do
aggregation on this union data. I don't see any reused operations.

1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
...

Your first example just does two aggregation operations. But your second
example like above does this aggregation operations for each iteration. So
the time of second example grows as the iteration increases.
 




-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to