[ https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Valentin Nikotin updated SPARK-23791: ------------------------------------- Description: It appears to be that with wholeStage codegen enabled simple spark job performing sum aggregation of 50 columns runs ~4 timer slower than without wholeStage codegen. Please check test case code. Please note that udf is only to prevent elimination optimizations that could be applied to literals. {code:scala} import org.apache.spark.sql.functions._ import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED object SPARK_23791 { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[4]") .appName("test") .getOrCreate() def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: DataFrame) = (0 until cnt).foldLeft(inputDF)((df, idx) => df.withColumn(s"$prefix$idx", value)) val dummy = udf(() => Option.empty[Int]) def test(cnt: Int = 50, rows: Int = 5000000, grps: Int = 1000): Double = { val t0 = System.nanoTime() spark.range(rows).toDF() .withColumn("grp", col("id").mod(grps)) .transform(addConstColumns("null_", cnt, dummy())) .groupBy("grp") .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*) .collect() val t1 = System.nanoTime() (t1 - t0) / 1e9 } val timings = for (i <- 1 to 3) yield { spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true) val with_wholestage = test() spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false) val without_wholestage = test() (with_wholestage, without_wholestage) } timings.foreach(println) println("Press enter ...") System.in.read() } } {code} was: It appears to be that with wholeStage codegen enabled simple spark job performing sum aggregation of 50 nullable columns runs ~4 timer slower than without wholeStage codegen. Please check test case code. Please note that udf is only to prevent elimination optimizations that could be applied to literals. {code:scala} import org.apache.spark.sql.functions._ import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED object SPARK_23791 { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[4]") .appName("test") .getOrCreate() def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: DataFrame) = (0 until cnt).foldLeft(inputDF)((df, idx) => df.withColumn(s"$prefix$idx", value)) val dummy = udf(() => Option.empty[Int]) def test(cnt: Int = 50, rows: Int = 5000000, grps: Int = 1000): Double = { val t0 = System.nanoTime() spark.range(rows).toDF() .withColumn("grp", col("id").mod(grps)) .transform(addConstColumns("null_", cnt, dummy())) .groupBy("grp") .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*) .collect() val t1 = System.nanoTime() (t1 - t0) / 1e9 } val timings = for (i <- 1 to 3) yield { spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true) val with_wholestage = test() spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false) val without_wholestage = test() (with_wholestage, without_wholestage) } timings.foreach(println) println("Press enter ...") System.in.read() } } {code} > Sub-optimal generated code for sum aggregating > ---------------------------------------------- > > Key: SPARK-23791 > URL: https://issues.apache.org/jira/browse/SPARK-23791 > Project: Spark > Issue Type: Bug > Components: Optimizer > Affects Versions: 2.2.0, 2.3.0 > Reporter: Valentin Nikotin > Priority: Major > Labels: performance > Original Estimate: 24h > Remaining Estimate: 24h > > It appears to be that with wholeStage codegen enabled simple spark job > performing sum aggregation of 50 columns runs ~4 timer slower than without > wholeStage codegen. > Please check test case code. Please note that udf is only to prevent > elimination optimizations that could be applied to literals. > {code:scala} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.{Column, DataFrame, SparkSession} > import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED > object SPARK_23791 { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[4]") > .appName("test") > .getOrCreate() > def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: > DataFrame) = > (0 until cnt).foldLeft(inputDF)((df, idx) => > df.withColumn(s"$prefix$idx", value)) > val dummy = udf(() => Option.empty[Int]) > def test(cnt: Int = 50, rows: Int = 5000000, grps: Int = 1000): Double = { > val t0 = System.nanoTime() > spark.range(rows).toDF() > .withColumn("grp", col("id").mod(grps)) > .transform(addConstColumns("null_", cnt, dummy())) > .groupBy("grp") > .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*) > .collect() > val t1 = System.nanoTime() > (t1 - t0) / 1e9 > } > val timings = for (i <- 1 to 3) yield { > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true) > val with_wholestage = test() > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false) > val without_wholestage = test() > (with_wholestage, without_wholestage) > } > timings.foreach(println) > println("Press enter ...") > System.in.read() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org