Repository: spark
Updated Branches:
  refs/heads/master 62551ccee -> ab5752cb9


[SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeRowConversion

## What changes were proposed in this pull request?

`needsUnsafeRowConversion` is used in 2 places:
1. `ColumnarBatchScan.produceRows`
2. `FileSourceScanExec.doExecute`

When we hit `ColumnarBatchScan.produceRows`, it means whole stage codegen is on 
but the vectorized reader is off. The vectorized reader can be off for several 
reasons:
1. the file format doesn't have a vectorized reader(json, csv, etc.)
2. the vectorized reader config is off
3. the schema is not supported

Anyway when the vectorized reader is off, file format reader will always return 
unsafe rows, and other `ColumnarBatchScan` implementations also always return 
unsafe rows, so `ColumnarBatchScan.needsUnsafeRowConversion` is not needed.

When we hit `FileSourceScanExec.doExecute`, it means whole stage codegen is 
off. For this case, we need the `needsUnsafeRowConversion` to convert 
`ColumnarRow` to `UnsafeRow`, if the file format reader returns batch.

This PR removes `ColumnarBatchScan.needsUnsafeRowConversion`, and keep this 
flag only in `FileSourceScanExec`

## How was this patch tested?

existing tests

Closes #22750 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenc...@databricks.com>
Signed-off-by: Dongjoon Hyun <dongj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab5752cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab5752cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab5752cb

Branch: refs/heads/master
Commit: ab5752cb952e6536a68a988289e57100fdbba142
Parents: 62551cc
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Sat Oct 20 17:45:04 2018 -0700
Committer: Dongjoon Hyun <dongj...@apache.org>
Committed: Sat Oct 20 17:45:04 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/ColumnarBatchScan.scala    | 10 +---------
 .../apache/spark/sql/execution/DataSourceScanExec.scala   |  7 ++++---
 .../sql/execution/columnar/InMemoryTableScanExec.scala    |  2 --
 .../execution/datasources/v2/DataSourceV2ScanExec.scala   |  2 --
 4 files changed, 5 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ab5752cb/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
index 9f6b593..7caff69 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
@@ -34,8 +34,6 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
 
   protected def supportsBatch: Boolean = true
 
-  protected def needsUnsafeRowConversion: Boolean = true
-
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
     "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
@@ -159,17 +157,11 @@ private[sql] trait ColumnarBatchScan extends 
CodegenSupport {
 
     ctx.INPUT_ROW = row
     ctx.currentVars = null
-    // Always provide `outputVars`, so that the framework can help us build 
unsafe row if the input
-    // row is not unsafe row, i.e. `needsUnsafeRowConversion` is true.
-    val outputVars = output.zipWithIndex.map { case (a, i) =>
-      BoundReference(i, a.dataType, a.nullable).genCode(ctx)
-    }
-    val inputRow = if (needsUnsafeRowConversion) null else row
     s"""
        |while ($limitNotReachedCond $input.hasNext()) {
        |  InternalRow $row = (InternalRow) $input.next();
        |  $numOutputRows.add(1);
-       |  ${consume(ctx, outputVars, inputRow).trim}
+       |  ${consume(ctx, null, row).trim}
        |  if (shouldStop()) return;
        |}
      """.stripMargin

http://git-wip-us.apache.org/repos/asf/spark/blob/ab5752cb/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 738c066..a9b18ab 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -168,10 +168,11 @@ case class FileSourceScanExec(
 
   // Note that some vals referring the file-based relation are lazy 
intentionally
   // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
-  override lazy val supportsBatch: Boolean = relation.fileFormat.supportBatch(
-    relation.sparkSession, StructType.fromAttributes(output))
+  override lazy val supportsBatch: Boolean = {
+    relation.fileFormat.supportBatch(relation.sparkSession, schema)
+  }
 
-  override lazy val needsUnsafeRowConversion: Boolean = {
+  private lazy val needsUnsafeRowConversion: Boolean = {
     if (relation.fileFormat.isInstanceOf[ParquetSource]) {
       
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/ab5752cb/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 196d057..8f8d801 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -69,8 +69,6 @@ case class InMemoryTableScanExec(
   // TODO: revisit this. Shall we always turn off whole stage codegen if the 
output data are rows?
   override def supportCodegen: Boolean = supportsBatch
 
-  override protected def needsUnsafeRowConversion: Boolean = false
-
   private val columnIndices =
     attributes.map(a => relation.output.map(o => 
o.exprId).indexOf(a.exprId)).toArray
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ab5752cb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 04a9773..25f86a6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -106,8 +106,6 @@ case class DataSourceV2ScanExec(
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
 
-  override protected def needsUnsafeRowConversion: Boolean = false
-
   override protected def doExecute(): RDD[InternalRow] = {
     if (supportsBatch) {
       WholeStageCodegenExec(this)(codegenStageId = 0).execute()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to