[
https://issues.apache.org/jira/browse/SPARK-47836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836605#comment-17836605
]
Tanel Kiis commented on SPARK-47836:
------------------------------------
{noformat}
QuantileSummaries: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Only insert 168 171
8 6.0 167.6 1.0X
Insert & merge 6690 6792
143 0.1 6690.3 0.0X
KllFloatsSketch insert 44 47
6 22.5 44.4 3.8X
KllFloatsSketch Insert & merge 55 57
6 18.3 54.7 3.1X
{noformat}
{code:java}
object QuantileSummariesBenchmark extends BenchmarkBase {
def test(name: String, numValues: Int): Unit = {
runBenchmark(name) {
val values = (1 to numValues).map(_ => Random.nextDouble())
val benchmark = new Benchmark(name, numValues, output = output)
benchmark.addCase("Only insert") { _: Int =>
var summaries = new QuantileSummaries(
compressThreshold = QuantileSummaries.defaultCompressThreshold,
relativeError = QuantileSummaries.defaultRelativeError)
for (value <- values) {
summaries = summaries.insert(value)
}
summaries = summaries.compress()
println("Median: " + summaries.query(0.5))
}
benchmark.addCase("Insert & merge") { _: Int =>
// Insert values in batches of 1000 and merge the summaries.
val summaries = values.grouped(1000).map(vs => {
var partialSummaries = new QuantileSummaries(
compressThreshold = QuantileSummaries.defaultCompressThreshold,
relativeError = QuantileSummaries.defaultRelativeError)
for (value <- vs) {
partialSummaries = partialSummaries.insert(value)
}
partialSummaries.compress()
}).reduce(_.merge(_))
println("Median: " + summaries.query(0.5))
}
benchmark.addCase("KllFloatsSketch insert") { _: Int =>
// Insert values in batches of 1000 and merge the summaries.
val summaries = KllDoublesSketch.newHeapInstance(
KllSketch.getKFromEpsilon(QuantileSummaries.defaultRelativeError,
true)
)
for (value <- values) {
summaries.update(value)
}
println("Median: " + summaries.getQuantile(0.5))
}
benchmark.addCase("KllFloatsSketch Insert & merge") { _: Int =>
// Insert values in batches of 1000 and merge the summaries.
val summaries = values.grouped(1000).map(vs => {
val partialSummaries = KllDoublesSketch.newHeapInstance(
KllSketch.getKFromEpsilon(QuantileSummaries.defaultRelativeError,
true)
)
for (value <- vs) {
partialSummaries.update(value)
}
partialSummaries
}).reduce((a, b) => {
a.merge(b)
a
})
println("Median: " + summaries.getQuantile(0.5))
}
benchmark.run()
}
}
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
test("QuantileSummaries", 1_000_000)
}
}
{code}
> Performance problem with QuantileSummaries
> ------------------------------------------
>
> Key: SPARK-47836
> URL: https://issues.apache.org/jira/browse/SPARK-47836
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.5.1
> Reporter: Tanel Kiis
> Priority: Major
>
> SPARK-29336 caused a severe performance regression.
> In practice a partial_aggregate with several approx_percentile calls ran less
> than hour and the final aggrergation after exchange would have taken over a
> week.
> Simple percentile ran about the same time in the first part and the final
> aggregate ran very quickly.
> I made a benchmark, and it reveals that the merge operation is very-very
> slow:
> https://github.com/tanelk/spark/commit/3b16f429a77b10003572295f42361fbfb2f3c63e
> From my experiments it looks like it is n^2 with the number of partitions
> (number of partial aggregations to merge).
> When I reverted the changes made in this PR, then the "Only insert" and
> "Insert & merge" were very similar.
> The cause seems to be, that compressImmut does not reduce the number samples
> allmost at all after merges and just keeps iterating over an evergrowing list.
> I was not able to figure out how to fix the issue without just reverting the
> PR.
> I also added a benchmark with KllDoublesSketch from the apache datasketches
> project and it worked even better than this class before this PR.
> Only downside was that it is not-deterministic.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]