Repository: spark Updated Branches: refs/heads/master 62551ccee -> ab5752cb9
[SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeRowConversion ## What changes were proposed in this pull request? `needsUnsafeRowConversion` is used in 2 places: 1. `ColumnarBatchScan.produceRows` 2. `FileSourceScanExec.doExecute` When we hit `ColumnarBatchScan.produceRows`, it means whole stage codegen is on but the vectorized reader is off. The vectorized reader can be off for several reasons: 1. the file format doesn't have a vectorized reader(json, csv, etc.) 2. the vectorized reader config is off 3. the schema is not supported Anyway when the vectorized reader is off, file format reader will always return unsafe rows, and other `ColumnarBatchScan` implementations also always return unsafe rows, so `ColumnarBatchScan.needsUnsafeRowConversion` is not needed. When we hit `FileSourceScanExec.doExecute`, it means whole stage codegen is off. For this case, we need the `needsUnsafeRowConversion` to convert `ColumnarRow` to `UnsafeRow`, if the file format reader returns batch. This PR removes `ColumnarBatchScan.needsUnsafeRowConversion`, and keep this flag only in `FileSourceScanExec` ## How was this patch tested? existing tests Closes #22750 from cloud-fan/minor. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab5752cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab5752cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab5752cb Branch: refs/heads/master Commit: ab5752cb952e6536a68a988289e57100fdbba142 Parents: 62551cc Author: Wenchen Fan <wenc...@databricks.com> Authored: Sat Oct 20 17:45:04 2018 -0700 Committer: Dongjoon Hyun <dongj...@apache.org> Committed: Sat Oct 20 17:45:04 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/execution/ColumnarBatchScan.scala | 10 +--------- .../apache/spark/sql/execution/DataSourceScanExec.scala | 7 ++++--- .../sql/execution/columnar/InMemoryTableScanExec.scala | 2 -- .../execution/datasources/v2/DataSourceV2ScanExec.scala | 2 -- 4 files changed, 5 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ab5752cb/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index 9f6b593..7caff69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -34,8 +34,6 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { protected def supportsBatch: Boolean = true - protected def needsUnsafeRowConversion: Boolean = true - override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) @@ -159,17 +157,11 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { ctx.INPUT_ROW = row ctx.currentVars = null - // Always provide `outputVars`, so that the framework can help us build unsafe row if the input - // row is not unsafe row, i.e. `needsUnsafeRowConversion` is true. - val outputVars = output.zipWithIndex.map { case (a, i) => - BoundReference(i, a.dataType, a.nullable).genCode(ctx) - } - val inputRow = if (needsUnsafeRowConversion) null else row s""" |while ($limitNotReachedCond $input.hasNext()) { | InternalRow $row = (InternalRow) $input.next(); | $numOutputRows.add(1); - | ${consume(ctx, outputVars, inputRow).trim} + | ${consume(ctx, null, row).trim} | if (shouldStop()) return; |} """.stripMargin http://git-wip-us.apache.org/repos/asf/spark/blob/ab5752cb/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 738c066..a9b18ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -168,10 +168,11 @@ case class FileSourceScanExec( // Note that some vals referring the file-based relation are lazy intentionally // so that this plan can be canonicalized on executor side too. See SPARK-23731. - override lazy val supportsBatch: Boolean = relation.fileFormat.supportBatch( - relation.sparkSession, StructType.fromAttributes(output)) + override lazy val supportsBatch: Boolean = { + relation.fileFormat.supportBatch(relation.sparkSession, schema) + } - override lazy val needsUnsafeRowConversion: Boolean = { + private lazy val needsUnsafeRowConversion: Boolean = { if (relation.fileFormat.isInstanceOf[ParquetSource]) { SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled } else { http://git-wip-us.apache.org/repos/asf/spark/blob/ab5752cb/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 196d057..8f8d801 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -69,8 +69,6 @@ case class InMemoryTableScanExec( // TODO: revisit this. Shall we always turn off whole stage codegen if the output data are rows? override def supportCodegen: Boolean = supportsBatch - override protected def needsUnsafeRowConversion: Boolean = false - private val columnIndices = attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray http://git-wip-us.apache.org/repos/asf/spark/blob/ab5752cb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 04a9773..25f86a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -106,8 +106,6 @@ case class DataSourceV2ScanExec( override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) - override protected def needsUnsafeRowConversion: Boolean = false - override protected def doExecute(): RDD[InternalRow] = { if (supportsBatch) { WholeStageCodegenExec(this)(codegenStageId = 0).execute() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org