Hi, Every iteration the data you run aggregation on it is different. As I showed in previous reply:
1st iteration: aggregation(x1 union x2) 2nd iteration: aggregation(x3 union (x1 union x2)) 3rd iteration: aggregation(x4 union(x3 union (x1 union x2))) In 1st you run aggregation on the data of x1 and x2. In 2nd the data is x1, x2 and x3. Even you work on the same RDD, you won't see reuse of the shuffle data because the shuffle data is different. In your second example, I think the way to reduce the computation is like: var totalTime: Long = 0 var allDF: org.apache.spark.sql.DataFrame = null for { x <- dataframes } { val timeLen = time { allDF = if (allDF == null) x else allDF.union(x) // Union previous aggregation summary with new dataframe in this window val grouped = allDF.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v")) val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2")) grouped2.show() allDF = grouped // Replace the union of data with aggregated summary } totalTime += timeLen println(s"Took $timeLen miliseconds") } println(s"Total time was $totalTime miliseconds") You don't need to recompute the aggregation of previous dataframes in each iteration. You just need to get the summary and union it with new dataframe to compute the newer aggregation summary in next iteration. It is more similar to streaming case, I don't think you can/should recompute all the data since the beginning of a stream. assaf.mendelson wrote > The reason I thought some operations would be reused is the fact that > spark automatically caches shuffle data which means the partial > aggregation for pervious dataframes would be saved. Unfortunatly, as Mark > Hamstra explained this is not the case because this is considered a new > RDD and therefore the previous data is lost. > > I am still wondering if there is any way to do high performance streaming > of SQL. Basically this is not far from what DStream would do assuming we > convert a sliding window (e.g. 24 hours every 15 minutes) as we would be > doing a foreachRDD which would do the joining behind the scenes. > The problem is that any attempt to do a streaming like this results in > performance which is hundreds of times slower than batch. > Is there a correct way to do such an aggregation on streaming data (using > dataframes rather than RDD operations). > Assaf. > > > > From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto: > ml-node+s1001551n20361h80@.nabble > ] > Sent: Monday, December 26, 2016 5:42 PM > To: Mendelson, Assaf > Subject: Re: Shuffle intermidiate results not being cached > > > Hi, > > Let me quote your example codes: > > var totalTime: Long = 0 > var allDF: org.apache.spark.sql.DataFrame = null > for { > x <- dataframes > } { > val timeLen = time { > allDF = if (allDF == null) x else allDF.union(x) > val grouped = allDF.groupBy("cat1", > "cat2").agg(sum($"valToAdd").alias("v")) > val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2")) > grouped2.show() > } > totalTime += timeLen > println(s"Took $timeLen miliseconds") > } > println(s"Total time was $totalTime miliseconds") > > > Basically what you do is to union some dataframes for each iteration, and > do aggregation on this union data. I don't see any reused operations. > > 1st iteration: aggregation(x1 union x2) > 2nd iteration: aggregation(x3 union (x1 union x2)) > 3rd iteration: aggregation(x4 union(x3 union (x1 union x2))) > ... > > Your first example just does two aggregation operations. But your second > example like above does this aggregation operations for each iteration. So > the time of second example grows as the iteration increases. > > Liang-Chi Hsieh | @viirya > Spark Technology Center > http://www.spark.tc/ > > ________________________________ > If you reply to this email, your message will be added to the discussion > below: > http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html > To start a new topic under Apache Spark Developers List, email > ml-node+s1001551n1h20@.nabble > <mailto: > ml-node+s1001551n1h20@.nabble > > > To unsubscribe from Apache Spark Developers List, click > here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>. > NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> ----- Liang-Chi Hsieh | @viirya Spark Technology Center http://www.spark.tc/ -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20371.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org