Repository: spark
Updated Branches:
  refs/heads/master c7270a46f -> 32fa0b814


[SPARK-21781][SQL] Modify DataSourceScanExec to use concrete ColumnVector type.

## What changes were proposed in this pull request?

As mentioned at 
https://github.com/apache/spark/pull/18680#issuecomment-316820409, when we have 
more `ColumnVector` implementations, it might (or might not) have huge 
performance implications because it might disable inlining, or force virtual 
dispatches.

As for read path, one of the major paths is the one generated by 
`ColumnBatchScan`. Currently it refers `ColumnVector` so the penalty will be 
bigger as we have more classes, but we can know the concrete type from its 
usage, e.g. vectorized Parquet reader uses `OnHeapColumnVector`. We can use the 
concrete type in the generated code directly to avoid the penalty.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <[email protected]>

Closes #18989 from ueshin/issues/SPARK-21781.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32fa0b81
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32fa0b81
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32fa0b81

Branch: refs/heads/master
Commit: 32fa0b81411f781173e185f4b19b9fd6d118f9fe
Parents: c7270a4
Author: Takuya UESHIN <[email protected]>
Authored: Tue Aug 29 20:16:45 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Tue Aug 29 20:16:45 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/execution/ColumnarBatchScan.scala       | 14 +++++++++-----
 .../spark/sql/execution/DataSourceScanExec.scala      |  5 +++++
 .../spark/sql/execution/datasources/FileFormat.scala  | 10 ++++++++++
 .../datasources/parquet/ParquetFileFormat.scala       |  8 ++++++++
 4 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/32fa0b81/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
index 74a47da..1afe83e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
@@ -33,6 +33,8 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
 
   val inMemoryTableScan: InMemoryTableScanExec = null
 
+  def vectorTypes: Option[Seq[String]] = None
+
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
     "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
@@ -79,17 +81,19 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport 
{
     val scanTimeTotalNs = ctx.freshName("scanTime")
     ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
 
-    val columnarBatchClz = 
"org.apache.spark.sql.execution.vectorized.ColumnarBatch"
+    val columnarBatchClz = classOf[ColumnarBatch].getName
     val batch = ctx.freshName("batch")
     ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
 
-    val columnVectorClz = 
"org.apache.spark.sql.execution.vectorized.ColumnVector"
     val idx = ctx.freshName("batchIdx")
     ctx.addMutableState("int", idx, s"$idx = 0;")
     val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
-    val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
-      ctx.addMutableState(columnVectorClz, name, s"$name = null;")
-      s"$name = $batch.column($i);"
+    val columnVectorClzs = vectorTypes.getOrElse(
+      Seq.fill(colVars.size)(classOf[ColumnVector].getName))
+    val columnAssigns = colVars.zip(columnVectorClzs).zipWithIndex.map {
+      case ((name, columnVectorClz), i) =>
+        ctx.addMutableState(columnVectorClz, name, s"$name = null;")
+        s"$name = ($columnVectorClz) $batch.column($i);"
     }
 
     val nextBatch = ctx.freshName("nextBatch")

http://git-wip-us.apache.org/repos/asf/spark/blob/32fa0b81/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 588c937..77e6dbf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -174,6 +174,11 @@ case class FileSourceScanExec(
     false
   }
 
+  override def vectorTypes: Option[Seq[String]] =
+    relation.fileFormat.vectorTypes(
+      requiredSchema = requiredSchema,
+      partitionSchema = relation.partitionSchema)
+
   @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
     val optimizerMetadataTimeNs = 
relation.location.metadataOpsTimeNs.getOrElse(0L)
     val startTime = System.nanoTime()

http://git-wip-us.apache.org/repos/asf/spark/blob/32fa0b81/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index dacf462..e5a7aee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -65,6 +65,16 @@ trait FileFormat {
   }
 
   /**
+   * Returns concrete column vector class names for each column to be used in 
a columnar batch
+   * if this format supports returning columnar batch.
+   */
+  def vectorTypes(
+      requiredSchema: StructType,
+      partitionSchema: StructType): Option[Seq[String]] = {
+    None
+  }
+
+  /**
    * Returns whether a file with `path` could be splitted or not.
    */
   def isSplitable(

http://git-wip-us.apache.org/repos/asf/spark/blob/32fa0b81/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 64eea26..e1e7405 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -272,6 +273,13 @@ class ParquetFileFormat
       schema.forall(_.dataType.isInstanceOf[AtomicType])
   }
 
+  override def vectorTypes(
+      requiredSchema: StructType,
+      partitionSchema: StructType): Option[Seq[String]] = {
+    Option(Seq.fill(requiredSchema.fields.length + 
partitionSchema.fields.length)(
+      classOf[OnHeapColumnVector].getName))
+  }
+
   override def isSplitable(
       sparkSession: SparkSession,
       options: Map[String, String],


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to