Repository: spark
Updated Branches:
refs/heads/master 7742d9f15 -> 7f7eb3934
[SPARK-16360][SQL] Speed up SQL query performance by removing redundant
`executePlan` call
## What changes were proposed in this pull request?
Currently, there are a few reports about Spark 2.0 query performance regression
for large queries.
This PR speeds up SQL query processing performance by removing redundant
**consecutive `executePlan`** call in `Dataset.ofRows` function and `Dataset`
instantiation. Specifically, this PR aims to reduce the overhead of SQL query
execution plan generation, not real query execution. So, we can not see the
result in the Spark Web UI. Please use the following query script. The result
is **25.78 sec** -> **12.36 sec** as expected.
**Sample Query**
```scala
val n = 4000
val values = (1 to n).map(_.toString).mkString(", ")
val columns = (1 to n).map("column" + _).mkString(", ")
val query =
s"""
|SELECT $columns
|FROM VALUES ($values) T($columns)
|WHERE 1=2 AND 1 IN ($columns)
|GROUP BY $columns
|ORDER BY $columns
|""".stripMargin
def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block
println("Elapsed time: " + ((System.nanoTime - t0) / 1e9) + "s")
result
}
```
**Before**
```scala
scala> time(sql(query))
Elapsed time: 30.138142577s // First query has a little overhead of
initialization.
res0: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998
more fields]
scala> time(sql(query))
Elapsed time: 25.787751452s // Let's compare this one.
res1: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998
more fields]
```
**After**
```scala
scala> time(sql(query))
Elapsed time: 17.500279659s // First query has a little overhead of
initialization.
res0: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998
more fields]
scala> time(sql(query))
Elapsed time: 12.364812255s // This shows the real difference. The speed up is
about 2 times.
res1: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998
more fields]
```
## How was this patch tested?
Manual by the above script.
Author: Dongjoon Hyun <[email protected]>
Closes #14044 from dongjoon-hyun/SPARK-16360.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f7eb393
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f7eb393
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f7eb393
Branch: refs/heads/master
Commit: 7f7eb3934ea258f2b163a87da06766bf5c7d443d
Parents: 7742d9f
Author: Dongjoon Hyun <[email protected]>
Authored: Tue Jul 5 16:19:22 2016 +0800
Committer: Cheng Lian <[email protected]>
Committed: Tue Jul 5 16:19:22 2016 +0800
----------------------------------------------------------------------
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7f7eb393/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e64669a..ededf7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -62,7 +62,7 @@ private[sql] object Dataset {
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame
= {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
- new Dataset[Row](sparkSession, logicalPlan, RowEncoder(qe.analyzed.schema))
+ new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]