This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a30bec1  [SPARK-37369][SQL] Avoid redundant ColumnarToRow transistion 
on InMemoryTableScan
a30bec1 is described below

commit a30bec17d6187ef6dfe276c67dbd4e023062b92b
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sun Dec 12 17:49:58 2021 -0800

    [SPARK-37369][SQL] Avoid redundant ColumnarToRow transistion on 
InMemoryTableScan
    
    ### What changes were proposed in this pull request?
    
    This patch proposes to let `InMemoryTableScanExec` produces row output 
directly, if its parent query plan only accepts rows instead of columnar 
output. In particular, this change adds a new method in `SparkPlan` called 
`supportsRowBased`, alongside with the existing `supportsColumnar`.
    
    ### Why are the changes needed?
    
    We currently have `supportsColumnar` indicating if a physical node can 
produce columnar output. The current columnar transition rule seems taking an 
assumption that one node can only produce columnar output but not row-based one 
if `supportsColumnar` returns true. But actually one node can possibly produce 
both format, i.e. columnar and row-based. For such node, if we require 
row-based output, the columnar transition rule will add additional 
columnar-to-row after it due to the wrong a [...]
    
    So this change introduces `supportsRowBased` which is used to indicates if 
the node can produce row-based output. The rule can check this method when 
deciding if a columnar-to-row transition is necessary or not.
    
    For example, `InMemoryTableScanExec` can produce columnar output. So if its 
parent plan isn't columnar, the rule adds a `ColumnarToRow` between them, e.g.,
    
    ```
    +- Union
    :- ColumnarToRow
    : +- InMemoryTableScan i#8, j#9
    : +- InMemoryRelation i#8, j#9, StorageLevel(disk, memory, deserialized, 1 
replicas)
    ```
    
    But `InMemoryTableScanExec` is capable of row-based output too. After this 
change, for such case, we can simply ask `InMemoryTableScanExec` to produce row 
output instead of a redundant conversion.
    
    ```
    
================================================================================================
    Int In-memory
    
================================================================================================
    
    OpenJDK 64-Bit Server VM 1.8.0_265-b01 on Mac OS X 10.16
    Intel(R) Core(TM) i7-9750H CPU  2.60GHz
    Int In-Memory scan:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
--------------------------------------------------------------------------------------------------------------------------
    columnar deserialization + columnar-to-row            228            245    
      15          4.4         227.7       1.0X
    row-based deserialization                             179            187    
      10          5.6         179.4       1.3X
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #34642 from viirya/SPARK-37369.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
 .../InMemoryColumnarBenchmark-results.txt          | 12 ++++
 .../org/apache/spark/sql/execution/Columnar.scala  |  4 +-
 .../org/apache/spark/sql/execution/SparkPlan.scala | 11 +++-
 .../sql/execution/columnar/InMemoryRelation.scala  |  3 +-
 .../execution/columnar/InMemoryTableScanExec.scala |  2 +
 .../org/apache/spark/sql/CachedTableSuite.scala    | 21 ++++++-
 .../spark/sql/DataFrameSetOperationsSuite.scala    |  7 +--
 .../sql/execution/WholeStageCodegenSuite.scala     |  2 +-
 .../columnar/CachedBatchSerializerSuite.scala      |  5 +-
 .../columnar/InMemoryColumnarBenchmark.scala       | 66 ++++++++++++++++++++++
 .../columnar/InMemoryColumnarQuerySuite.scala      |  6 +-
 .../spark/sql/execution/debug/DebuggingSuite.scala |  5 +-
 .../sql/execution/metric/SQLMetricsSuite.scala     |  6 +-
 13 files changed, 128 insertions(+), 22 deletions(-)

diff --git a/sql/core/benchmarks/InMemoryColumnarBenchmark-results.txt 
b/sql/core/benchmarks/InMemoryColumnarBenchmark-results.txt
new file mode 100644
index 0000000..2998d8b
--- /dev/null
+++ b/sql/core/benchmarks/InMemoryColumnarBenchmark-results.txt
@@ -0,0 +1,12 @@
+================================================================================================
+Int In-memory
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1021-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+Int In-Memory scan:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+--------------------------------------------------------------------------------------------------------------------------
+columnar deserialization + columnar-to-row            266            276       
   12          3.8         265.9       1.0X
+row-based deserialization                             197            206       
   15          5.1         196.7       1.4X
+
+
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index d1e9168..f57e7e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -535,8 +535,8 @@ case class ApplyColumnarRulesAndInsertTransitions(
   private def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean): 
SparkPlan = {
     if (outputsColumnar) {
       insertRowToColumnar(plan)
-    } else if (plan.supportsColumnar) {
-      // `outputsColumnar` is false but the plan outputs columnar format, so 
add a
+    } else if (plan.supportsColumnar && !plan.supportsRowBased) {
+      // `outputsColumnar` is false but the plan only outputs columnar format, 
so add a
       // to-row transition here.
       ColumnarToRowExec(insertRowToColumnar(plan))
     } else if (!plan.isInstanceOf[ColumnarToRowTransition]) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 5c4266d..f56beeb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -72,7 +72,16 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
   val id: Int = SparkPlan.newPlanId()
 
   /**
-   * Return true if this stage of the plan supports columnar execution.
+   * Return true if this stage of the plan supports row-based execution. A plan
+   * can also support columnar execution (see `supportsColumnar`). Spark will 
decide
+   * which execution to be called during query planning.
+   */
+  def supportsRowBased: Boolean = !supportsColumnar
+
+  /**
+   * Return true if this stage of the plan supports columnar execution. A plan
+   * can also support row-based execution (see `supportsRowBased`). Spark will 
decide
+   * which execution to be called during query planning.
    */
   def supportsColumnar: Boolean = false
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 525653c..89323e7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -256,7 +256,8 @@ case class CachedRDDBuilder(
   }
 
   private def buildBuffers(): RDD[CachedBatch] = {
-    val cb = if (cachedPlan.supportsColumnar) {
+    val cb = if (cachedPlan.supportsColumnar &&
+        serializer.supportsColumnarInput(cachedPlan.output)) {
       serializer.convertColumnarBatchToCachedBatch(
         cachedPlan.executeColumnar(),
         cachedPlan.output,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 3f97fbc..da9316e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -55,6 +55,8 @@ case class InMemoryTableScanExec(
   override def vectorTypes: Option[Seq[String]] =
     relation.cacheBuilder.serializer.vectorTypes(attributes, conf)
 
+  override def supportsRowBased: Boolean = true
+
   /**
    * If true, get data from ColumnVector in ColumnarBatch, which are generally 
faster.
    * If false, get data from UnsafeRow build from CachedBatch
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index bad2815..52803e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -35,7 +35,7 @@ import 
org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, 
JoinStrategyHint, SHUFFLE_HASH}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants
-import org.apache.spark.sql.execution.{ExecSubqueryExpression, RDDScanExec, 
SparkPlan}
+import org.apache.spark.sql.execution.{ColumnarToRowExec, 
ExecSubqueryExpression, RDDScanExec, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
@@ -881,8 +881,10 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
   test("SPARK-23312: vectorized cache reader can be disabled") {
     Seq(true, false).foreach { vectorized =>
       withSQLConf(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> 
vectorized.toString) {
-        val df = spark.range(10).cache()
-        df.queryExecution.executedPlan.foreach {
+        val df1 = spark.range(10).cache()
+        val df2 = spark.range(10).cache()
+        val union = df1.union(df2)
+        union.queryExecution.executedPlan.foreach {
           case i: InMemoryTableScanExec =>
             assert(i.supportsColumnar == vectorized)
           case _ =>
@@ -891,6 +893,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
     }
   }
 
+  test("SPARK-37369: Avoid redundant ColumnarToRow transition on 
InMemoryTableScan") {
+    Seq(true, false).foreach { vectorized =>
+      withSQLConf(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> 
vectorized.toString) {
+        val cache = spark.range(10).cache()
+        val df = cache.filter($"id" > 0)
+        val columnarToRow = df.queryExecution.executedPlan.collect {
+          case c: ColumnarToRowExec => c
+        }
+        assert(columnarToRow.isEmpty)
+      }
+    }
+  }
+
   private def checkIfNoJobTriggered[T](f: => T): T = {
     var numJobTriggered = 0
     val jobListener = new SparkListener {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
index 26df517..b19e430 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
@@ -1375,12 +1375,11 @@ class DataFrameSetOperationsSuite extends QueryTest 
with SharedSparkSession {
         val df1 = Seq(1, 2, 3).toDF("i").cache()
         val df2 = Seq(4, 5, 6).toDF("j").cache()
 
-        checkIfColumnar(df1.queryExecution.executedPlan,
+        val union = df1.union(df2)
+        checkIfColumnar(union.queryExecution.executedPlan,
           _.isInstanceOf[InMemoryTableScanExec], supported)
-        checkIfColumnar(df2.queryExecution.executedPlan,
+        checkIfColumnar(union.queryExecution.executedPlan,
           _.isInstanceOf[InMemoryTableScanExec], supported)
-
-        val union = df1.union(df2)
         checkIfColumnar(union.queryExecution.executedPlan, 
_.isInstanceOf[UnionExec], supported)
         checkAnswer(union, Row(1) :: Row(2) :: Row(3) :: Row(4) :: Row(5) :: 
Row(6) :: Nil)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 00ea371..55ca1e8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -467,7 +467,7 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
     val planInt = dsIntFilter.queryExecution.executedPlan
     assert(planInt.collect {
       case WholeStageCodegenExec(FilterExec(_,
-          ColumnarToRowExec(InputAdapter(_: InMemoryTableScanExec)))) => ()
+          InputAdapter(_: InMemoryTableScanExec))) => ()
     }.length == 1)
     assert(dsIntFilter.collect() === Array(1, 2))
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
index 099a1aa..9f7304c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
@@ -138,8 +138,9 @@ class CachedBatchSerializerSuite  extends QueryTest with 
SharedSparkSession {
       input.write.parquet(workDirPath)
       val data = spark.read.parquet(workDirPath)
       data.cache()
-      assert(data.count() == 3)
-      checkAnswer(data, Row(100) :: Row(200) :: Row(300) :: Nil)
+      val df = data.union(data)
+      assert(df.count() == 6)
+      checkAnswer(df, Row(100) :: Row(200) :: Row(300) :: Row(100) :: Row(200) 
:: Row(300) :: Nil)
     }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarBenchmark.scala
new file mode 100644
index 0000000..b975451
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarBenchmark.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.columnar
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.execution.ColumnarToRowExec
+import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
+
+/**
+ * Benchmark to low level memory access using different ways to manage buffers.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class>
+ *        --jars <spark core test jar> <spark sql test jar>
+ *   2. build/sbt "sql/test:runMain <this class>"
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"sql/test:runMain <this class>"
+ *      Results will be written to 
"benchmarks/InMemoryColumnarBenchmark-results.txt".
+ * }}}
+ */
+object InMemoryColumnarBenchmark extends SqlBasedBenchmark {
+  def intCache(rowsNum: Int, numIters: Int): Unit = {
+    val data = spark.range(0, rowsNum, 1, 1).toDF("i").cache()
+
+    val inMemoryScan = data.queryExecution.executedPlan.collect {
+      case m: InMemoryTableScanExec => m
+    }
+
+    val columnarScan = ColumnarToRowExec(inMemoryScan(0))
+    val rowScan = inMemoryScan(0)
+
+    assert(inMemoryScan.size == 1)
+
+    val benchmark = new Benchmark("Int In-Memory scan", rowsNum, output = 
output)
+
+    benchmark.addCase("columnar deserialization + columnar-to-row", numIters) 
{ _ =>
+      columnarScan.executeCollect()
+    }
+
+    benchmark.addCase("row-based deserialization", numIters) { _ =>
+      rowScan.executeCollect()
+    }
+
+    benchmark.run()
+  }
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    runBenchmark("Int In-memory") {
+      intCache(rowsNum = 1000000, numIters = 3)
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index b8f73f4..2cf12dd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, In}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.columnar.CachedBatch
-import org.apache.spark.sql.execution.{ColumnarToRowExec, FilterExec, 
InputAdapter, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.{FilterExec, InputAdapter, 
WholeStageCodegenExec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -504,8 +504,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSparkSession {
     val df2 = df1.where("y = 3")
 
     val planBeforeFilter = df2.queryExecution.executedPlan.collect {
-      case FilterExec(_, c: ColumnarToRowExec) => c.child
-      case WholeStageCodegenExec(FilterExec(_, ColumnarToRowExec(i: 
InputAdapter))) => i.child
+      case f: FilterExec => f.child
+      case WholeStageCodegenExec(FilterExec(_, i: InputAdapter)) => i.child
     }
     assert(planBeforeFilter.head.isInstanceOf[InMemoryTableScanExec])
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index 5085c74..1b894dd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -125,8 +125,9 @@ class DebuggingSuite extends DebuggingSuiteBase with 
DisableAdaptiveExecutionSui
   }
 
   test("SPARK-28537: DebugExec cannot debug columnar related queries") {
-    val df = spark.range(5)
-    df.persist()
+    val base = spark.range(5)
+    base.persist()
+    val df = base.union(base)
 
     val captured = new ByteArrayOutputStream()
     Console.withOut(captured) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index a51003e..162531ba 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -743,19 +743,19 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
       sql("CREATE TEMPORARY VIEW inMemoryTable AS SELECT 1 AS c1")
       sql("CACHE TABLE inMemoryTable")
       testSparkPlanMetrics(spark.table("inMemoryTable"), 1,
-        Map(1L -> (("Scan In-memory table inMemoryTable", Map.empty)))
+        Map(0L -> (("Scan In-memory table inMemoryTable", Map.empty)))
       )
 
       sql("CREATE TEMPORARY VIEW ```a``b``` AS SELECT 2 AS c1")
       sql("CACHE TABLE ```a``b```")
       testSparkPlanMetrics(spark.table("```a``b```"), 1,
-        Map(1L -> (("Scan In-memory table ```a``b```", Map.empty)))
+        Map(0L -> (("Scan In-memory table ```a``b```", Map.empty)))
       )
     }
 
     // Show InMemoryTableScan on UI
     testSparkPlanMetrics(spark.range(1).cache().select("id"), 1,
-      Map(1L -> (("InMemoryTableScan", Map.empty)))
+      Map(0L -> (("InMemoryTableScan", Map.empty)))
     )
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to