This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a30bec1 [SPARK-37369][SQL] Avoid redundant ColumnarToRow transistion
on InMemoryTableScan
a30bec1 is described below
commit a30bec17d6187ef6dfe276c67dbd4e023062b92b
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sun Dec 12 17:49:58 2021 -0800
[SPARK-37369][SQL] Avoid redundant ColumnarToRow transistion on
InMemoryTableScan
### What changes were proposed in this pull request?
This patch proposes to let `InMemoryTableScanExec` produces row output
directly, if its parent query plan only accepts rows instead of columnar
output. In particular, this change adds a new method in `SparkPlan` called
`supportsRowBased`, alongside with the existing `supportsColumnar`.
### Why are the changes needed?
We currently have `supportsColumnar` indicating if a physical node can
produce columnar output. The current columnar transition rule seems taking an
assumption that one node can only produce columnar output but not row-based one
if `supportsColumnar` returns true. But actually one node can possibly produce
both format, i.e. columnar and row-based. For such node, if we require
row-based output, the columnar transition rule will add additional
columnar-to-row after it due to the wrong a [...]
So this change introduces `supportsRowBased` which is used to indicates if
the node can produce row-based output. The rule can check this method when
deciding if a columnar-to-row transition is necessary or not.
For example, `InMemoryTableScanExec` can produce columnar output. So if its
parent plan isn't columnar, the rule adds a `ColumnarToRow` between them, e.g.,
```
+- Union
:- ColumnarToRow
: +- InMemoryTableScan i#8, j#9
: +- InMemoryRelation i#8, j#9, StorageLevel(disk, memory, deserialized, 1
replicas)
```
But `InMemoryTableScanExec` is capable of row-based output too. After this
change, for such case, we can simply ask `InMemoryTableScanExec` to produce row
output instead of a redundant conversion.
```
================================================================================================
Int In-memory
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_265-b01 on Mac OS X 10.16
Intel(R) Core(TM) i7-9750H CPU 2.60GHz
Int In-Memory scan: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
columnar deserialization + columnar-to-row 228 245
15 4.4 227.7 1.0X
row-based deserialization 179 187
10 5.6 179.4 1.3X
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes #34642 from viirya/SPARK-37369.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
.../InMemoryColumnarBenchmark-results.txt | 12 ++++
.../org/apache/spark/sql/execution/Columnar.scala | 4 +-
.../org/apache/spark/sql/execution/SparkPlan.scala | 11 +++-
.../sql/execution/columnar/InMemoryRelation.scala | 3 +-
.../execution/columnar/InMemoryTableScanExec.scala | 2 +
.../org/apache/spark/sql/CachedTableSuite.scala | 21 ++++++-
.../spark/sql/DataFrameSetOperationsSuite.scala | 7 +--
.../sql/execution/WholeStageCodegenSuite.scala | 2 +-
.../columnar/CachedBatchSerializerSuite.scala | 5 +-
.../columnar/InMemoryColumnarBenchmark.scala | 66 ++++++++++++++++++++++
.../columnar/InMemoryColumnarQuerySuite.scala | 6 +-
.../spark/sql/execution/debug/DebuggingSuite.scala | 5 +-
.../sql/execution/metric/SQLMetricsSuite.scala | 6 +-
13 files changed, 128 insertions(+), 22 deletions(-)
diff --git a/sql/core/benchmarks/InMemoryColumnarBenchmark-results.txt
b/sql/core/benchmarks/InMemoryColumnarBenchmark-results.txt
new file mode 100644
index 0000000..2998d8b
--- /dev/null
+++ b/sql/core/benchmarks/InMemoryColumnarBenchmark-results.txt
@@ -0,0 +1,12 @@
+================================================================================================
+Int In-memory
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1021-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+Int In-Memory scan: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+--------------------------------------------------------------------------------------------------------------------------
+columnar deserialization + columnar-to-row 266 276
12 3.8 265.9 1.0X
+row-based deserialization 197 206
15 5.1 196.7 1.4X
+
+
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index d1e9168..f57e7e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -535,8 +535,8 @@ case class ApplyColumnarRulesAndInsertTransitions(
private def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean):
SparkPlan = {
if (outputsColumnar) {
insertRowToColumnar(plan)
- } else if (plan.supportsColumnar) {
- // `outputsColumnar` is false but the plan outputs columnar format, so
add a
+ } else if (plan.supportsColumnar && !plan.supportsRowBased) {
+ // `outputsColumnar` is false but the plan only outputs columnar format,
so add a
// to-row transition here.
ColumnarToRowExec(insertRowToColumnar(plan))
} else if (!plan.isInstanceOf[ColumnarToRowTransition]) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 5c4266d..f56beeb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -72,7 +72,16 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with
Logging with Serializ
val id: Int = SparkPlan.newPlanId()
/**
- * Return true if this stage of the plan supports columnar execution.
+ * Return true if this stage of the plan supports row-based execution. A plan
+ * can also support columnar execution (see `supportsColumnar`). Spark will
decide
+ * which execution to be called during query planning.
+ */
+ def supportsRowBased: Boolean = !supportsColumnar
+
+ /**
+ * Return true if this stage of the plan supports columnar execution. A plan
+ * can also support row-based execution (see `supportsRowBased`). Spark will
decide
+ * which execution to be called during query planning.
*/
def supportsColumnar: Boolean = false
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 525653c..89323e7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -256,7 +256,8 @@ case class CachedRDDBuilder(
}
private def buildBuffers(): RDD[CachedBatch] = {
- val cb = if (cachedPlan.supportsColumnar) {
+ val cb = if (cachedPlan.supportsColumnar &&
+ serializer.supportsColumnarInput(cachedPlan.output)) {
serializer.convertColumnarBatchToCachedBatch(
cachedPlan.executeColumnar(),
cachedPlan.output,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 3f97fbc..da9316e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -55,6 +55,8 @@ case class InMemoryTableScanExec(
override def vectorTypes: Option[Seq[String]] =
relation.cacheBuilder.serializer.vectorTypes(attributes, conf)
+ override def supportsRowBased: Boolean = true
+
/**
* If true, get data from ColumnVector in ColumnarBatch, which are generally
faster.
* If false, get data from UnsafeRow build from CachedBatch
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index bad2815..52803e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -35,7 +35,7 @@ import
org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join,
JoinStrategyHint, SHUFFLE_HASH}
import org.apache.spark.sql.catalyst.util.DateTimeConstants
-import org.apache.spark.sql.execution.{ExecSubqueryExpression, RDDScanExec,
SparkPlan}
+import org.apache.spark.sql.execution.{ColumnarToRowExec,
ExecSubqueryExpression, RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
@@ -881,8 +881,10 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
test("SPARK-23312: vectorized cache reader can be disabled") {
Seq(true, false).foreach { vectorized =>
withSQLConf(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key ->
vectorized.toString) {
- val df = spark.range(10).cache()
- df.queryExecution.executedPlan.foreach {
+ val df1 = spark.range(10).cache()
+ val df2 = spark.range(10).cache()
+ val union = df1.union(df2)
+ union.queryExecution.executedPlan.foreach {
case i: InMemoryTableScanExec =>
assert(i.supportsColumnar == vectorized)
case _ =>
@@ -891,6 +893,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
}
}
+ test("SPARK-37369: Avoid redundant ColumnarToRow transition on
InMemoryTableScan") {
+ Seq(true, false).foreach { vectorized =>
+ withSQLConf(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key ->
vectorized.toString) {
+ val cache = spark.range(10).cache()
+ val df = cache.filter($"id" > 0)
+ val columnarToRow = df.queryExecution.executedPlan.collect {
+ case c: ColumnarToRowExec => c
+ }
+ assert(columnarToRow.isEmpty)
+ }
+ }
+ }
+
private def checkIfNoJobTriggered[T](f: => T): T = {
var numJobTriggered = 0
val jobListener = new SparkListener {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
index 26df517..b19e430 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
@@ -1375,12 +1375,11 @@ class DataFrameSetOperationsSuite extends QueryTest
with SharedSparkSession {
val df1 = Seq(1, 2, 3).toDF("i").cache()
val df2 = Seq(4, 5, 6).toDF("j").cache()
- checkIfColumnar(df1.queryExecution.executedPlan,
+ val union = df1.union(df2)
+ checkIfColumnar(union.queryExecution.executedPlan,
_.isInstanceOf[InMemoryTableScanExec], supported)
- checkIfColumnar(df2.queryExecution.executedPlan,
+ checkIfColumnar(union.queryExecution.executedPlan,
_.isInstanceOf[InMemoryTableScanExec], supported)
-
- val union = df1.union(df2)
checkIfColumnar(union.queryExecution.executedPlan,
_.isInstanceOf[UnionExec], supported)
checkAnswer(union, Row(1) :: Row(2) :: Row(3) :: Row(4) :: Row(5) ::
Row(6) :: Nil)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 00ea371..55ca1e8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -467,7 +467,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
val planInt = dsIntFilter.queryExecution.executedPlan
assert(planInt.collect {
case WholeStageCodegenExec(FilterExec(_,
- ColumnarToRowExec(InputAdapter(_: InMemoryTableScanExec)))) => ()
+ InputAdapter(_: InMemoryTableScanExec))) => ()
}.length == 1)
assert(dsIntFilter.collect() === Array(1, 2))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
index 099a1aa..9f7304c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
@@ -138,8 +138,9 @@ class CachedBatchSerializerSuite extends QueryTest with
SharedSparkSession {
input.write.parquet(workDirPath)
val data = spark.read.parquet(workDirPath)
data.cache()
- assert(data.count() == 3)
- checkAnswer(data, Row(100) :: Row(200) :: Row(300) :: Nil)
+ val df = data.union(data)
+ assert(df.count() == 6)
+ checkAnswer(df, Row(100) :: Row(200) :: Row(300) :: Row(100) :: Row(200)
:: Row(300) :: Nil)
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarBenchmark.scala
new file mode 100644
index 0000000..b975451
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarBenchmark.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.columnar
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.execution.ColumnarToRowExec
+import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
+
+/**
+ * Benchmark to low level memory access using different ways to manage buffers.
+ * To run this benchmark:
+ * {{{
+ * 1. without sbt:
+ * bin/spark-submit --class <this class>
+ * --jars <spark core test jar> <spark sql test jar>
+ * 2. build/sbt "sql/test:runMain <this class>"
+ * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt
"sql/test:runMain <this class>"
+ * Results will be written to
"benchmarks/InMemoryColumnarBenchmark-results.txt".
+ * }}}
+ */
+object InMemoryColumnarBenchmark extends SqlBasedBenchmark {
+ def intCache(rowsNum: Int, numIters: Int): Unit = {
+ val data = spark.range(0, rowsNum, 1, 1).toDF("i").cache()
+
+ val inMemoryScan = data.queryExecution.executedPlan.collect {
+ case m: InMemoryTableScanExec => m
+ }
+
+ val columnarScan = ColumnarToRowExec(inMemoryScan(0))
+ val rowScan = inMemoryScan(0)
+
+ assert(inMemoryScan.size == 1)
+
+ val benchmark = new Benchmark("Int In-Memory scan", rowsNum, output =
output)
+
+ benchmark.addCase("columnar deserialization + columnar-to-row", numIters)
{ _ =>
+ columnarScan.executeCollect()
+ }
+
+ benchmark.addCase("row-based deserialization", numIters) { _ =>
+ rowScan.executeCollect()
+ }
+
+ benchmark.run()
+ }
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ runBenchmark("Int In-memory") {
+ intCache(rowsNum = 1000000, numIters = 3)
+ }
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index b8f73f4..2cf12dd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSet, In}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.columnar.CachedBatch
-import org.apache.spark.sql.execution.{ColumnarToRowExec, FilterExec,
InputAdapter, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.{FilterExec, InputAdapter,
WholeStageCodegenExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -504,8 +504,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with
SharedSparkSession {
val df2 = df1.where("y = 3")
val planBeforeFilter = df2.queryExecution.executedPlan.collect {
- case FilterExec(_, c: ColumnarToRowExec) => c.child
- case WholeStageCodegenExec(FilterExec(_, ColumnarToRowExec(i:
InputAdapter))) => i.child
+ case f: FilterExec => f.child
+ case WholeStageCodegenExec(FilterExec(_, i: InputAdapter)) => i.child
}
assert(planBeforeFilter.head.isInstanceOf[InMemoryTableScanExec])
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index 5085c74..1b894dd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -125,8 +125,9 @@ class DebuggingSuite extends DebuggingSuiteBase with
DisableAdaptiveExecutionSui
}
test("SPARK-28537: DebugExec cannot debug columnar related queries") {
- val df = spark.range(5)
- df.persist()
+ val base = spark.range(5)
+ base.persist()
+ val df = base.union(base)
val captured = new ByteArrayOutputStream()
Console.withOut(captured) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index a51003e..162531ba 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -743,19 +743,19 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
sql("CREATE TEMPORARY VIEW inMemoryTable AS SELECT 1 AS c1")
sql("CACHE TABLE inMemoryTable")
testSparkPlanMetrics(spark.table("inMemoryTable"), 1,
- Map(1L -> (("Scan In-memory table inMemoryTable", Map.empty)))
+ Map(0L -> (("Scan In-memory table inMemoryTable", Map.empty)))
)
sql("CREATE TEMPORARY VIEW ```a``b``` AS SELECT 2 AS c1")
sql("CACHE TABLE ```a``b```")
testSparkPlanMetrics(spark.table("```a``b```"), 1,
- Map(1L -> (("Scan In-memory table ```a``b```", Map.empty)))
+ Map(0L -> (("Scan In-memory table ```a``b```", Map.empty)))
)
}
// Show InMemoryTableScan on UI
testSparkPlanMetrics(spark.range(1).cache().select("id"), 1,
- Map(1L -> (("InMemoryTableScan", Map.empty)))
+ Map(0L -> (("InMemoryTableScan", Map.empty)))
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]