I understand the actual dataframe is different, but the underlying partitions
are not (hence the importance of mark's response). The code you suggested would
not work as allDF and x would have different schema's (x is the original and
allDF becomes the grouped).
I can do something like this:
var totalTime: Long = 0
var allDF: DataFrame = null
for {
x <- dataframes
} {
val timeLen = time {
val grouped = x.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))
allDF = if (allDF == null) grouped else {
allDF.union(grouped).groupBy("cat1", "cat2").agg(sum($"v").alias("v"))
}
val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
grouped2.show()
}
totalTime += timeLen
println(s"Took $timeLen miliseconds")
}
println(s"Overall time was $totalTime miliseconds")
}
and this indeed improves performance (I actually had a couple more tries) but:
1. This still gives crappy performance (for 167 slices I get a throughput
which is 10 times lower than batch after doing some tuning including caching
and coalescing)
2. This works because the aggregation here is sum and we don't forget.
For more general aggregations we would have to join them together (can't do it
for count distinct for example) and we will need to "forget" frames when moving
out of the window (we can subtract a sum but not a max).
The best solution I found so far (performance wise) was to write a custom UDAF
which does the window internally. This was still 8 times lower throughput than
batch and required a lot of coding and is not a general solution.
I am looking for an approach to improve the performance even more (preferably
to either be on par with batch or a relatively low factor which remains
constant when the number of slices rise) and including the option to "forget"
frames.
Assaf.
From: Liang-Chi Hsieh [via Apache Spark Developers List]
[mailto:[email protected]]
Sent: Wednesday, December 28, 2016 3:59 AM
To: Mendelson, Assaf
Subject: RE: Shuffle intermidiate results not being cached
Hi,
Every iteration the data you run aggregation on it is different. As I showed in
previous reply:
1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
In 1st you run aggregation on the data of x1 and x2. In 2nd the data is x1, x2
and x3. Even you work on the same RDD, you won't see reuse of the shuffle data
because the shuffle data is different.
In your second example, I think the way to reduce the computation is like:
var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
x <- dataframes
} {
val timeLen = time {
allDF = if (allDF == null) x else allDF.union(x) // Union previous
aggregation summary with new dataframe in this window
val grouped = allDF.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))
val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
grouped2.show()
allDF = grouped // Replace the union of data with aggregated summary
}
totalTime += timeLen
println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")
You don't need to recompute the aggregation of previous dataframes in each
iteration. You just need to get the summary and union it with new dataframe to
compute the newer aggregation summary in next iteration. It is more similar to
streaming case, I don't think you can/should recompute all the data since the
beginning of a stream.
assaf.mendelson wrote
The reason I thought some operations would be reused is the fact that spark
automatically caches shuffle data which means the partial aggregation for
pervious dataframes would be saved. Unfortunatly, as Mark Hamstra explained
this is not the case because this is considered a new RDD and therefore the
previous data is lost.
I am still wondering if there is any way to do high performance streaming of
SQL. Basically this is not far from what DStream would do assuming we convert a
sliding window (e.g. 24 hours every 15 minutes) as we would be doing a
foreachRDD which would do the joining behind the scenes.
The problem is that any attempt to do a streaming like this results in
performance which is hundreds of times slower than batch.
Is there a correct way to do such an aggregation on streaming data (using
dataframes rather than RDD operations).
Assaf.
From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:[hidden
email]</user/SendEmail.jtp?type=node&node=20371&i=0>]
Sent: Monday, December 26, 2016 5:42 PM
To: Mendelson, Assaf
Subject: Re: Shuffle intermidiate results not being cached
Hi,
Let me quote your example codes:
var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
x <- dataframes
} {
val timeLen = time {
allDF = if (allDF == null) x else allDF.union(x)
val grouped = allDF.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))
val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
grouped2.show()
}
totalTime += timeLen
println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")
Basically what you do is to union some dataframes for each iteration, and do
aggregation on this union data. I don't see any reused operations.
1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
...
Your first example just does two aggregation operations. But your second
example like above does this aggregation operations for each iteration. So the
time of second example grows as the iteration increases.
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
To start a new topic under Apache Spark Developers List, email [hidden
email]</user/SendEmail.jtp?type=node&node=20371&i=1><mailto:[hidden
email]</user/SendEmail.jtp?type=node&node=20371&i=2>>
To unsubscribe from Apache Spark Developers List, click
here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==><http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==%3e>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml><http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml%3e>
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20371.html
To start a new topic under Apache Spark Developers List, email
[email protected]<mailto:[email protected]>
To unsubscribe from Apache Spark Developers List, click
here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
--
View this message in context:
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20377.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.