This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 3551f8b89f1 [SPARK-45306][SQL][TESTS] Make `InMemoryColumnarBenchmark`
use AQE-aware utils to collect plans
3551f8b89f1 is described below
commit 3551f8b89f1d70a9218b8c0331bddc06c5020e95
Author: yangjie01 <[email protected]>
AuthorDate: Mon Sep 25 19:06:02 2023 +0800
[SPARK-45306][SQL][TESTS] Make `InMemoryColumnarBenchmark` use AQE-aware
utils to collect plans
### What changes were proposed in this pull request?
This pr makes `InMemoryColumnarBenchmark` inherit from
AdaptiveSparkPlanHelper and use the `AdaptiveSparkPlanHelper#collect` function
to collect plans, enabling `InMemoryColumnarBenchmark` to run successfully.
### Why are the changes needed?
After SPARK-42768 merged, the default value of
`spark.sql.optimizer.canChangeCachedPlanOutputPartitioning` has changed from
false to true, so `InMemoryColumnarBenchmark ` should use AQE-aware utils to
collect plans.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual verification.
run `build/sbt "sql/Test/runMain
org.apache.spark.sql.execution.columnar.InMemoryColumnarBenchmark"`
**Before**
```
[error] Exception in thread "main" java.lang.IndexOutOfBoundsException: 0
[error] at scala.collection.LinearSeqOps.apply(LinearSeq.scala:131)
[error] at scala.collection.LinearSeqOps.apply$(LinearSeq.scala:128)
[error] at scala.collection.immutable.List.apply(List.scala:79)
[error] at
org.apache.spark.sql.execution.columnar.InMemoryColumnarBenchmark$.intCache(InMemoryColumnarBenchmark.scala:47)
[error] at
org.apache.spark.sql.execution.columnar.InMemoryColumnarBenchmark$.$anonfun$runBenchmarkSuite$1(InMemoryColumnarBenchmark.scala:68)
[error] at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[error] at
org.apache.spark.benchmark.BenchmarkBase.runBenchmark(BenchmarkBase.scala:42)
[error] at
org.apache.spark.sql.execution.columnar.InMemoryColumnarBenchmark$.runBenchmarkSuite(InMemoryColumnarBenchmark.scala:68)
[error] at
org.apache.spark.benchmark.BenchmarkBase.main(BenchmarkBase.scala:72)
[error] at
org.apache.spark.sql.execution.columnar.InMemoryColumnarBenchmark.main(InMemoryColumnarBenchmark.scala)
[error] Nonzero exit code returned from runner: 1
[error] (sql / Test / runMain) Nonzero exit code returned from runner: 1
```
**After**
```
[info] OpenJDK 64-Bit Server VM 17.0.8+7-LTS on Mac OS X 13.5.2
[info] Apple M2 Max
[info] Int In-Memory scan: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info]
--------------------------------------------------------------------------------------------------------------------------
[info] columnar deserialization + columnar-to-row 95
116 34 10.5 95.4 1.0X
[info] row-based deserialization 85
99 22 11.8 85.1 1.1X
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43093 from LuciferYang/fix-InMemoryColumnarBenchmark.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 7e9666be15b5210db00231faacd3cfa15ed71907)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/execution/columnar/InMemoryColumnarBenchmark.scala | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarBenchmark.scala
index 55d9fb27317..1f132dabd28 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarBenchmark.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.columnar
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.execution.ColumnarToRowExec
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
/**
@@ -33,11 +34,11 @@ import
org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
* Results will be written to
"benchmarks/InMemoryColumnarBenchmark-results.txt".
* }}}
*/
-object InMemoryColumnarBenchmark extends SqlBasedBenchmark {
+object InMemoryColumnarBenchmark extends SqlBasedBenchmark with
AdaptiveSparkPlanHelper {
def intCache(rowsNum: Long, numIters: Int): Unit = {
val data = spark.range(0, rowsNum, 1, 1).toDF("i").cache()
- val inMemoryScan = data.queryExecution.executedPlan.collect {
+ val inMemoryScan = collect(data.queryExecution.executedPlan) {
case m: InMemoryTableScanExec => m
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]