[ 
https://issues.apache.org/jira/browse/SPARK-47836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836605#comment-17836605
 ] 

Tanel Kiis commented on SPARK-47836:
------------------------------------


{noformat}
QuantileSummaries:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Only insert                                         168            171          
 8          6.0         167.6       1.0X
Insert & merge                                     6690           6792         
143          0.1        6690.3       0.0X
KllFloatsSketch insert                               44             47          
 6         22.5          44.4       3.8X
KllFloatsSketch Insert & merge                       55             57          
 6         18.3          54.7       3.1X
{noformat}


{code:java}
object QuantileSummariesBenchmark extends BenchmarkBase {

  def test(name: String, numValues: Int): Unit = {
    runBenchmark(name) {
      val values = (1 to numValues).map(_ => Random.nextDouble())

      val benchmark = new Benchmark(name, numValues, output = output)
      benchmark.addCase("Only insert") { _: Int =>
        var summaries = new QuantileSummaries(
          compressThreshold = QuantileSummaries.defaultCompressThreshold,
          relativeError = QuantileSummaries.defaultRelativeError)

        for (value <- values) {
          summaries = summaries.insert(value)
        }
        summaries = summaries.compress()

        println("Median: " + summaries.query(0.5))
      }

      benchmark.addCase("Insert & merge") { _: Int =>
        // Insert values in batches of 1000 and merge the summaries.
        val summaries = values.grouped(1000).map(vs => {
          var partialSummaries = new QuantileSummaries(
            compressThreshold = QuantileSummaries.defaultCompressThreshold,
            relativeError = QuantileSummaries.defaultRelativeError)

          for (value <- vs) {
            partialSummaries = partialSummaries.insert(value)
          }
          partialSummaries.compress()
        }).reduce(_.merge(_))

        println("Median: " + summaries.query(0.5))
      }

      benchmark.addCase("KllFloatsSketch insert") { _: Int =>
        // Insert values in batches of 1000 and merge the summaries.
        val summaries = KllDoublesSketch.newHeapInstance(
          KllSketch.getKFromEpsilon(QuantileSummaries.defaultRelativeError, 
true)
        )

        for (value <- values) {
          summaries.update(value)
        }

        println("Median: " + summaries.getQuantile(0.5))
      }

      benchmark.addCase("KllFloatsSketch Insert & merge") { _: Int =>
        // Insert values in batches of 1000 and merge the summaries.
        val summaries = values.grouped(1000).map(vs => {
          val partialSummaries = KllDoublesSketch.newHeapInstance(
            KllSketch.getKFromEpsilon(QuantileSummaries.defaultRelativeError, 
true)
          )

          for (value <- vs) {
            partialSummaries.update(value)
          }

          partialSummaries
        }).reduce((a, b) => {
          a.merge(b)
          a
        })

        println("Median: " + summaries.getQuantile(0.5))
      }

      benchmark.run()
    }
  }

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
    test("QuantileSummaries", 1_000_000)
  }
}
{code}


> Performance problem with QuantileSummaries
> ------------------------------------------
>
>                 Key: SPARK-47836
>                 URL: https://issues.apache.org/jira/browse/SPARK-47836
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.5.1
>            Reporter: Tanel Kiis
>            Priority: Major
>
> SPARK-29336 caused a severe performance regression.
> In practice a partial_aggregate with several approx_percentile calls ran less 
> than hour and the final aggrergation after exchange would have taken over a 
> week.
> Simple percentile ran about the same time in the first part and the final 
> aggregate ran very quickly.
> I made a benchmark, and it reveals that the merge operation is very-very 
> slow: 
> https://github.com/tanelk/spark/commit/3b16f429a77b10003572295f42361fbfb2f3c63e
> From my experiments it looks like it is n^2 with the number of partitions 
> (number of partial aggregations to merge).
> When I reverted the changes made in this PR, then the "Only insert" and 
> "Insert & merge" were very similar.
> The cause seems to be, that compressImmut does not reduce the number samples 
> allmost at all after merges and just keeps iterating over an evergrowing list.
> I was not able to figure out how to fix the issue without just reverting the 
> PR.
> I also added a benchmark with KllDoublesSketch from the apache datasketches 
> project and it worked even better than this class before this PR.
> Only downside was that it is not-deterministic. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to