[
https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Valentin Nikotin updated SPARK-23791:
-------------------------------------
Description:
It appears to be that with wholeStage codegen enabled simple spark job
performing sum aggregation of 50 nullable columns runs ~4 timer slower than
without wholeStage codegen.
Please check test case code. Please note that udf is only to prevent
elimination optimizations that could be applied to literals.
{code:scala}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
object SPARK_23791 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[4]")
.appName("test")
.getOrCreate()
def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF:
DataFrame) =
(0 until cnt).foldLeft(inputDF)((df, idx) =>
df.withColumn(s"$prefix$idx", value))
val dummy = udf(() => Option.empty[Int])
def test(cnt: Int = 50, rows: Int = 5000000, grps: Int = 1000): Double = {
val t0 = System.nanoTime()
spark.range(rows).toDF()
.withColumn("grp", col("id").mod(grps))
.transform(addConstColumns("null_", cnt, dummy()))
.groupBy("grp")
.agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
.collect()
val t1 = System.nanoTime()
(t1 - t0) / 1e9
}
val timings = for (i <- 1 to 3) yield {
spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
val with_wholestage = test()
spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
val without_wholestage = test()
(with_wholestage, without_wholestage)
}
timings.foreach(println)
println("Press enter ...")
System.in.read()
}
}
{code}
was:
It appears to be that with wholeStage codegen enabled simple spark job
performing sum aggregation of 50 nullable columns runs ~4 timer slower than
without wholeStage codegen.
Please check test case code. Please note that udf is only to prevent
elimination optimizations that could be applied to literals.
{code:java}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
object SPARK_23791 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[4]")
.appName("test")
.getOrCreate()
def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF:
DataFrame) =
(0 until cnt).foldLeft(inputDF)((df, idx) =>
df.withColumn(s"$prefix$idx", value))
val dummy = udf(() => Option.empty[Int])
def test(cnt: Int = 50, rows: Int = 5000000, grps: Int = 1000): Double = {
val t0 = System.nanoTime()
spark.range(rows).toDF()
.withColumn("grp", col("id").mod(grps))
.transform(addConstColumns("null_", cnt, dummy()))
.groupBy("grp")
.agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
.collect()
val t1 = System.nanoTime()
(t1 - t0) / 1e9
}
val timings = for (i <- 1 to 3) yield {
spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
val with_wholestage = test()
spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
val without_wholestage = test()
(with_wholestage, without_wholestage)
}
timings.foreach(println)
println("Press enter ...")
System.in.read()
}
}
{code}
> Sub-optimal generated code when aggregating nullable columns
> ------------------------------------------------------------
>
> Key: SPARK-23791
> URL: https://issues.apache.org/jira/browse/SPARK-23791
> Project: Spark
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 2.2.0, 2.3.0
> Environment: {code:java}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object TestCase {
> def main(args: Array[String]): Unit = {
> val spark = SparkSession
> .builder()
> .master("local[4]")
> .appName("test")
> .getOrCreate()
> def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF:
> DataFrame) =
> (0 until cnt).foldLeft(inputDF)((df, idx) =>
> df.withColumn(s"$prefix$idx", value))
> val dummy = udf(() => Option.empty[Int])
> def test(cnt: Int = 50, rows: Int = 5000000, grps: Int = 1000): Double = {
> val t0 = System.nanoTime()
> spark.range(rows).toDF()
> .withColumn("grp", col("id").mod(grps))
> .transform(addConstColumns("null_", cnt, dummy()))
> .groupBy("grp")
> .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
> .collect()
> val t1 = System.nanoTime()
> (t1 - t0) / 1e9
> }
> val timings = for (i <- 1 to 3) yield {
> spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
> val with_wholestage = test()
> spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
> val without_wholestage = test()
> (with_wholestage, without_wholestage)
> }
> timings.foreach(println)
> println("Press enter ...")
> System.in.read()
> }
> }
> {code}
> Reporter: Valentin Nikotin
> Priority: Major
> Labels: performance
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> It appears to be that with wholeStage codegen enabled simple spark job
> performing sum aggregation of 50 nullable columns runs ~4 timer slower than
> without wholeStage codegen.
> Please check test case code. Please note that udf is only to prevent
> elimination optimizations that could be applied to literals.
> {code:scala}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object SPARK_23791 {
> def main(args: Array[String]): Unit = {
> val spark = SparkSession
> .builder()
> .master("local[4]")
> .appName("test")
> .getOrCreate()
> def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF:
> DataFrame) =
> (0 until cnt).foldLeft(inputDF)((df, idx) =>
> df.withColumn(s"$prefix$idx", value))
> val dummy = udf(() => Option.empty[Int])
> def test(cnt: Int = 50, rows: Int = 5000000, grps: Int = 1000): Double = {
> val t0 = System.nanoTime()
> spark.range(rows).toDF()
> .withColumn("grp", col("id").mod(grps))
> .transform(addConstColumns("null_", cnt, dummy()))
> .groupBy("grp")
> .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
> .collect()
> val t1 = System.nanoTime()
> (t1 - t0) / 1e9
> }
> val timings = for (i <- 1 to 3) yield {
> spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
> val with_wholestage = test()
> spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
> val without_wholestage = test()
> (with_wholestage, without_wholestage)
> }
> timings.foreach(println)
> println("Press enter ...")
> System.in.read()
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]