The reason I thought some operations would be reused is the fact that spark 
automatically caches shuffle data which means the partial aggregation for 
pervious dataframes would be saved. Unfortunatly, as Mark Hamstra explained 
this is not the case because this is considered a new RDD and therefore the 
previous data is lost.

I am still wondering if there is any way to do high performance streaming of 
SQL. Basically this is not far from what DStream would do assuming we convert a 
sliding window (e.g. 24 hours every 15 minutes) as we would be doing a 
foreachRDD which would do the joining behind the scenes.
The problem is that any attempt to do a streaming like this results in 
performance which is hundreds of times slower than batch.
Is there a correct way to do such an aggregation on streaming data (using 
dataframes rather than RDD operations).
Assaf.



From: Liang-Chi Hsieh [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n20361...@n3.nabble.com]
Sent: Monday, December 26, 2016 5:42 PM
To: Mendelson, Assaf
Subject: Re: Shuffle intermidiate results not being cached


Hi,

Let me quote your example codes:

var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
  x <- dataframes
} {
  val timeLen = time {
    allDF = if (allDF == null) x else allDF.union(x)
    val grouped = allDF.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))
    val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
    grouped2.show()
  }
  totalTime += timeLen
  println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")


Basically what you do is to union some dataframes for each iteration, and do 
aggregation on this union data. I don't see any reused operations.

1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
...

Your first example just does two aggregation operations. But your second 
example like above does this aggregation operations for each iteration. So the 
time of second example grows as the iteration increases.

Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/

________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com<mailto:ml-node+s1001551n1...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click 
here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20368.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Reply via email to