Repository: spark Updated Branches: refs/heads/master c7270a46f -> 32fa0b814
[SPARK-21781][SQL] Modify DataSourceScanExec to use concrete ColumnVector type. ## What changes were proposed in this pull request? As mentioned at https://github.com/apache/spark/pull/18680#issuecomment-316820409, when we have more `ColumnVector` implementations, it might (or might not) have huge performance implications because it might disable inlining, or force virtual dispatches. As for read path, one of the major paths is the one generated by `ColumnBatchScan`. Currently it refers `ColumnVector` so the penalty will be bigger as we have more classes, but we can know the concrete type from its usage, e.g. vectorized Parquet reader uses `OnHeapColumnVector`. We can use the concrete type in the generated code directly to avoid the penalty. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <[email protected]> Closes #18989 from ueshin/issues/SPARK-21781. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32fa0b81 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32fa0b81 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32fa0b81 Branch: refs/heads/master Commit: 32fa0b81411f781173e185f4b19b9fd6d118f9fe Parents: c7270a4 Author: Takuya UESHIN <[email protected]> Authored: Tue Aug 29 20:16:45 2017 +0800 Committer: Wenchen Fan <[email protected]> Committed: Tue Aug 29 20:16:45 2017 +0800 ---------------------------------------------------------------------- .../spark/sql/execution/ColumnarBatchScan.scala | 14 +++++++++----- .../spark/sql/execution/DataSourceScanExec.scala | 5 +++++ .../spark/sql/execution/datasources/FileFormat.scala | 10 ++++++++++ .../datasources/parquet/ParquetFileFormat.scala | 8 ++++++++ 4 files changed, 32 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/32fa0b81/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index 74a47da..1afe83e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -33,6 +33,8 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val inMemoryTableScan: InMemoryTableScanExec = null + def vectorTypes: Option[Seq[String]] = None + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) @@ -79,17 +81,19 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val scanTimeTotalNs = ctx.freshName("scanTime") ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") - val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" + val columnarBatchClz = classOf[ColumnarBatch].getName val batch = ctx.freshName("batch") ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") - val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" val idx = ctx.freshName("batchIdx") ctx.addMutableState("int", idx, s"$idx = 0;") val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) - val columnAssigns = colVars.zipWithIndex.map { case (name, i) => - ctx.addMutableState(columnVectorClz, name, s"$name = null;") - s"$name = $batch.column($i);" + val columnVectorClzs = vectorTypes.getOrElse( + Seq.fill(colVars.size)(classOf[ColumnVector].getName)) + val columnAssigns = colVars.zip(columnVectorClzs).zipWithIndex.map { + case ((name, columnVectorClz), i) => + ctx.addMutableState(columnVectorClz, name, s"$name = null;") + s"$name = ($columnVectorClz) $batch.column($i);" } val nextBatch = ctx.freshName("nextBatch") http://git-wip-us.apache.org/repos/asf/spark/blob/32fa0b81/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 588c937..77e6dbf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -174,6 +174,11 @@ case class FileSourceScanExec( false } + override def vectorTypes: Option[Seq[String]] = + relation.fileFormat.vectorTypes( + requiredSchema = requiredSchema, + partitionSchema = relation.partitionSchema) + @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() http://git-wip-us.apache.org/repos/asf/spark/blob/32fa0b81/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index dacf462..e5a7aee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -65,6 +65,16 @@ trait FileFormat { } /** + * Returns concrete column vector class names for each column to be used in a columnar batch + * if this format supports returning columnar batch. + */ + def vectorTypes( + requiredSchema: StructType, + partitionSchema: StructType): Option[Seq[String]] = { + None + } + + /** * Returns whether a file with `path` could be splitted or not. */ def isSplitable( http://git-wip-us.apache.org/repos/asf/spark/blob/32fa0b81/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 64eea26..e1e7405 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -272,6 +273,13 @@ class ParquetFileFormat schema.forall(_.dataType.isInstanceOf[AtomicType]) } + override def vectorTypes( + requiredSchema: StructType, + partitionSchema: StructType): Option[Seq[String]] = { + Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)( + classOf[OnHeapColumnVector].getName)) + } + override def isSplitable( sparkSession: SparkSession, options: Map[String, String], --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
