[ 
https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16412799#comment-16412799
 ] 

Valentin Nikotin commented on SPARK-23791:
------------------------------------------

When testing aggregation with different number of columns (v2.3.0) 
I found that 100 columns works the approx same time for both cases.
With 90 columns Spark job failed with 
{noformat}
18/03/25 00:11:33 ERROR Executor: Exception in task 117.0 in stage 1.0 (TID 4)
java.lang.ClassFormatError: Too many arguments in method signature in class 
file 
org/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage2
        at java.lang.ClassLoader.defineClass1(Native Method)
{noformat}


> Sub-optimal generated code for sum aggregating
> ----------------------------------------------
>
>                 Key: SPARK-23791
>                 URL: https://issues.apache.org/jira/browse/SPARK-23791
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.2.0, 2.3.0
>            Reporter: Valentin Nikotin
>            Priority: Major
>              Labels: performance
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It appears to be that with wholeStage codegen enabled simple spark job 
> performing sum aggregation of 50 columns runs ~4 timer slower than without 
> wholeStage codegen.
> Please check test case code. Please note that udf is only to prevent 
> elimination optimizations that could be applied to literals. 
> {code:scala}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object SPARK_23791 {
>   def main(args: Array[String]): Unit = {
>     val spark = SparkSession
>       .builder()
>       .master("local[4]")
>       .appName("test")
>       .getOrCreate()
>     def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
> DataFrame) =
>       (0 until cnt).foldLeft(inputDF)((df, idx) => 
> df.withColumn(s"$prefix$idx", value))
>     val dummy = udf(() => Option.empty[Int])
>     def test(cnt: Int = 50, rows: Int = 5000000, grps: Int = 1000): Double = {
>       val t0 = System.nanoTime()
>       spark.range(rows).toDF()
>         .withColumn("grp", col("id").mod(grps))
>         .transform(addConstColumns("null_", cnt, dummy()))
>         .groupBy("grp")
>         .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
>         .collect()
>       val t1 = System.nanoTime()
>       (t1 - t0) / 1e9
>     }
>     val timings = for (i <- 1 to 3) yield {
>       spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
>       val with_wholestage = test()
>       spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
>       val without_wholestage = test()
>       (with_wholestage, without_wholestage)
>     }
>     timings.foreach(println)
>     println("Press enter ...")
>     System.in.read()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to