This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 02e9f93  [SPARK-27384][SQL] File source V2: Prune unnecessary 
partition columns
02e9f93 is described below

commit 02e9f933097107d870dba87cc03f6003af9b0efa
Author: Gengliang Wang <[email protected]>
AuthorDate: Mon Apr 8 15:14:02 2019 +0800

    [SPARK-27384][SQL] File source V2: Prune unnecessary partition columns
    
    ## What changes were proposed in this pull request?
    
    When scanning file sources, we can prune unnecessary partition columns on 
constructing input partitions, so that:
    1. Reduce the data transformation from Driver to Executors
    2. Make it easier to implement columnar batch readers, since the partition 
columns are already pruned.
    ## How was this patch tested?
    
    Existing unit tests.
    
    Closes #24296 from gengliangwang/prunePartitionValue.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../v2/FilePartitionReaderFactory.scala            | 13 -------
 .../sql/execution/datasources/v2/FileScan.scala    | 41 +++++++++++++++++++---
 .../execution/datasources/v2/FileScanBuilder.scala | 41 ++++++++++++++++++----
 .../datasources/v2/TextBasedFileScan.scala         |  5 +--
 .../v2/csv/CSVPartitionReaderFactory.scala         |  7 ++--
 .../sql/execution/datasources/v2/csv/CSVScan.scala | 13 ++++---
 .../datasources/v2/csv/CSVScanBuilder.scala        |  5 +--
 .../v2/orc/OrcPartitionReaderFactory.scala         | 37 +++++++++----------
 .../sql/execution/datasources/v2/orc/OrcScan.scala |  9 +++--
 .../datasources/v2/orc/OrcScanBuilder.scala        |  5 +--
 10 files changed, 114 insertions(+), 62 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
index 1daf8ae..d053ea9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
@@ -46,19 +46,6 @@ abstract class FilePartitionReaderFactory extends 
PartitionReaderFactory {
   def buildColumnarReader(partitionedFile: PartitionedFile): 
PartitionReader[ColumnarBatch] = {
     throw new UnsupportedOperationException("Cannot create columnar reader.")
   }
-
-  protected def getReadDataSchema(
-      readSchema: StructType,
-      partitionSchema: StructType,
-      isCaseSensitive: Boolean): StructType = {
-    val partitionNameSet =
-      partitionSchema.fields.map(PartitioningUtils.getColName(_, 
isCaseSensitive)).toSet
-    val fields = readSchema.fields.filterNot { field =>
-      partitionNameSet.contains(PartitioningUtils.getColName(field, 
isCaseSensitive))
-    }
-
-    StructType(fields)
-  }
 }
 
 // A compound class for combining file and its corresponding reader.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
index e971fd7..337aac9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
@@ -16,9 +16,13 @@
  */
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.Locale
+
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.PartitionedFileUtil
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan}
@@ -28,8 +32,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 abstract class FileScan(
     sparkSession: SparkSession,
     fileIndex: PartitioningAwareFileIndex,
-    readSchema: StructType,
-    options: CaseInsensitiveStringMap) extends Scan with Batch {
+    readDataSchema: StructType,
+    readPartitionSchema: StructType) extends Scan with Batch {
   /**
    * Returns whether a file with `path` could be split or not.
    */
@@ -40,7 +44,23 @@ abstract class FileScan(
   protected def partitions: Seq[FilePartition] = {
     val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty)
     val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, 
selectedPartitions)
+    val partitionAttributes = fileIndex.partitionSchema.toAttributes
+    val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> 
a).toMap
+    val readPartitionAttributes = readPartitionSchema.map { readField =>
+      attributeMap.get(normalizeName(readField.name)).getOrElse {
+        throw new AnalysisException(s"Can't find required partition column 
${readField.name} " +
+          s"in partition schema ${fileIndex.partitionSchema}")
+      }
+    }
+    lazy val partitionValueProject =
+      GenerateUnsafeProjection.generate(readPartitionAttributes, 
partitionAttributes)
     val splitFiles = selectedPartitions.flatMap { partition =>
+      // Prune partition values if part of the partition columns are not 
required.
+      val partitionValues = if (readPartitionAttributes != 
partitionAttributes) {
+        partitionValueProject(partition.values).copy()
+      } else {
+        partition.values
+      }
       partition.files.flatMap { file =>
         val filePath = file.getPath
         PartitionedFileUtil.splitFiles(
@@ -49,7 +69,7 @@ abstract class FileScan(
           filePath = filePath,
           isSplitable = isSplitable(filePath),
           maxSplitBytes = maxSplitBytes,
-          partitionValues = partition.values
+          partitionValues = partitionValues
         )
       }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
     }
@@ -61,4 +81,17 @@ abstract class FileScan(
   }
 
   override def toBatch: Batch = this
+
+  override def readSchema(): StructType =
+    StructType(readDataSchema.fields ++ readPartitionSchema.fields)
+
+  private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+
+  private def normalizeName(name: String): String = {
+    if (isCaseSensitive) {
+      name
+    } else {
+      name.toLowerCase(Locale.ROOT)
+    }
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
index d4e55a5..3b236be 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
@@ -16,15 +16,44 @@
  */
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.sources.v2.reader.{ScanBuilder, 
SupportsPushDownFilters, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, 
PartitioningUtils}
+import org.apache.spark.sql.sources.v2.reader.{ScanBuilder, 
SupportsPushDownRequiredColumns}
 import org.apache.spark.sql.types.StructType
 
-abstract class FileScanBuilder(schema: StructType)
-  extends ScanBuilder
-  with SupportsPushDownRequiredColumns {
-  protected var readSchema = schema
+abstract class FileScanBuilder(
+    sparkSession: SparkSession,
+    fileIndex: PartitioningAwareFileIndex,
+    dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
+  private val partitionSchema = fileIndex.partitionSchema
+  private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+  protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-    this.readSchema = requiredSchema
+    this.requiredSchema = requiredSchema
   }
+
+  protected def readDataSchema(): StructType = {
+    val requiredNameSet = createRequiredNameSet()
+    val fields = dataSchema.fields.filter { field =>
+      val colName = PartitioningUtils.getColName(field, isCaseSensitive)
+      requiredNameSet.contains(colName) && !partitionNameSet.contains(colName)
+    }
+    StructType(fields)
+  }
+
+  protected def readPartitionSchema(): StructType = {
+    val requiredNameSet = createRequiredNameSet()
+    val fields = partitionSchema.fields.filter { field =>
+      val colName = PartitioningUtils.getColName(field, isCaseSensitive)
+      requiredNameSet.contains(colName)
+    }
+    StructType(fields)
+  }
+
+  private def createRequiredNameSet(): Set[String] =
+    requiredSchema.fields.map(PartitioningUtils.getColName(_, 
isCaseSensitive)).toSet
+
+  private val partitionNameSet: Set[String] =
+    partitionSchema.fields.map(PartitioningUtils.getColName(_, 
isCaseSensitive)).toSet
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala
index 8d9cc68..d6b84dc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala
@@ -29,9 +29,10 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 abstract class TextBasedFileScan(
     sparkSession: SparkSession,
     fileIndex: PartitioningAwareFileIndex,
-    readSchema: StructType,
+    readDataSchema: StructType,
+    readPartitionSchema: StructType,
     options: CaseInsensitiveStringMap)
-  extends FileScan(sparkSession, fileIndex, readSchema, options) {
+  extends FileScan(sparkSession, fileIndex, readDataSchema, 
readPartitionSchema) {
   private var codecFactory: CompressionCodecFactory = _
 
   override def isSplitable(path: Path): Boolean = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
index e2d5028..28e3104 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
@@ -33,24 +33,21 @@ import org.apache.spark.util.SerializableConfiguration
  * @param sqlConf SQL configuration.
  * @param broadcastedConf Broadcasted serializable Hadoop Configuration.
  * @param dataSchema Schema of CSV files.
+ * @param readDataSchema Required data schema in the batch scan.
  * @param partitionSchema Schema of partitions.
- * @param readSchema Required schema in the batch scan.
  * @param parsedOptions Options for parsing CSV files.
  */
 case class CSVPartitionReaderFactory(
     sqlConf: SQLConf,
     broadcastedConf: Broadcast[SerializableConfiguration],
     dataSchema: StructType,
+    readDataSchema: StructType,
     partitionSchema: StructType,
-    readSchema: StructType,
     parsedOptions: CSVOptions) extends FilePartitionReaderFactory {
   private val columnPruning = sqlConf.csvColumnPruning
-  private val readDataSchema =
-    getReadDataSchema(readSchema, partitionSchema, 
sqlConf.caseSensitiveAnalysis)
 
   override def buildReader(file: PartitionedFile): 
PartitionReader[InternalRow] = {
     val conf = broadcastedConf.value.value
-
     val parser = new UnivocityParser(
       StructType(dataSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord)),
       StructType(readDataSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord)),
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
index 8f2f8f2..5bc8029 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala
@@ -35,9 +35,10 @@ case class CSVScan(
     sparkSession: SparkSession,
     fileIndex: PartitioningAwareFileIndex,
     dataSchema: StructType,
-    readSchema: StructType,
+    readDataSchema: StructType,
+    readPartitionSchema: StructType,
     options: CaseInsensitiveStringMap)
-  extends TextBasedFileScan(sparkSession, fileIndex, readSchema, options) {
+  extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema, 
readPartitionSchema, options) {
 
   private lazy val parsedOptions: CSVOptions = new CSVOptions(
     options.asScala.toMap,
@@ -53,8 +54,8 @@ case class CSVScan(
     // Check a field requirement for corrupt records here to throw an 
exception in a driver side
     ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, 
parsedOptions.columnNameOfCorruptRecord)
 
-    if (readSchema.length == 1 &&
-      readSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
+    if (readDataSchema.length == 1 &&
+      readDataSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
       throw new AnalysisException(
         "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed 
when the\n" +
           "referenced columns only include the internal corrupt record 
column\n" +
@@ -72,7 +73,9 @@ case class CSVScan(
     val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
     val broadcastedConf = sparkSession.sparkContext.broadcast(
       new SerializableConfiguration(hadoopConf))
+    // The partition values are already truncated in `FileScan.partitions`.
+    // We should use `readPartitionSchema` as the partition schema here.
     CSVPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
-      dataSchema, fileIndex.partitionSchema, readSchema, parsedOptions)
+      dataSchema, readDataSchema, readPartitionSchema, parsedOptions)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala
index dbb3c03..28c5b3d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala
@@ -29,9 +29,10 @@ case class CSVScanBuilder(
     fileIndex: PartitioningAwareFileIndex,
     schema: StructType,
     dataSchema: StructType,
-    options: CaseInsensitiveStringMap) extends FileScanBuilder(schema) {
+    options: CaseInsensitiveStringMap)
+  extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
 
   override def build(): Scan = {
-    CSVScan(sparkSession, fileIndex, dataSchema, readSchema, options)
+    CSVScan(sparkSession, fileIndex, dataSchema, readDataSchema(), 
readPartitionSchema(), options)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index 1da9469..ec92379 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -46,30 +46,30 @@ import org.apache.spark.util.SerializableConfiguration
  * @param sqlConf SQL configuration.
  * @param broadcastedConf Broadcast serializable Hadoop Configuration.
  * @param dataSchema Schema of orc files.
+ * @param readDataSchema Required data schema in the batch scan.
  * @param partitionSchema Schema of partitions.
- * @param readSchema Required schema in the batch scan.
  */
 case class OrcPartitionReaderFactory(
     sqlConf: SQLConf,
     broadcastedConf: Broadcast[SerializableConfiguration],
     dataSchema: StructType,
-    partitionSchema: StructType,
-    readSchema: StructType) extends FilePartitionReaderFactory {
+    readDataSchema: StructType,
+    partitionSchema: StructType) extends FilePartitionReaderFactory {
+  private val resultSchema = StructType(readDataSchema.fields ++ 
partitionSchema.fields)
   private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
   private val capacity = sqlConf.orcVectorizedReaderBatchSize
 
   override def supportColumnarReads(partition: InputPartition): Boolean = {
     sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled &&
-      readSchema.length <= sqlConf.wholeStageMaxNumFields &&
-      readSchema.forall(_.dataType.isInstanceOf[AtomicType])
+      resultSchema.length <= sqlConf.wholeStageMaxNumFields &&
+      resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
   }
 
   override def buildReader(file: PartitionedFile): 
PartitionReader[InternalRow] = {
     val conf = broadcastedConf.value.value
 
-    val readDataSchema = getReadDataSchema(readSchema, partitionSchema, 
isCaseSensitive)
-    val readDataSchemaString = 
OrcUtils.orcTypeDescriptionString(readDataSchema)
-    OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, readDataSchemaString)
+    val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
+    OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
     OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, 
isCaseSensitive)
 
     val filePath = new Path(new URI(file.filePath))
@@ -113,8 +113,8 @@ case class OrcPartitionReaderFactory(
   override def buildColumnarReader(file: PartitionedFile): 
PartitionReader[ColumnarBatch] = {
     val conf = broadcastedConf.value.value
 
-    val readSchemaString = OrcUtils.orcTypeDescriptionString(readSchema)
-    OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, readSchemaString)
+    val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
+    OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
     OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, 
isCaseSensitive)
 
     val filePath = new Path(new URI(file.filePath))
@@ -124,13 +124,13 @@ case class OrcPartitionReaderFactory(
     val reader = OrcFile.createReader(filePath, readerOptions)
 
     val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds(
-      isCaseSensitive, dataSchema, readSchema, reader, conf)
+      isCaseSensitive, dataSchema, readDataSchema, reader, conf)
 
     if (requestedColIdsOrEmptyFile.isEmpty) {
       new EmptyPartitionReader
     } else {
-      val requestedColIds = requestedColIdsOrEmptyFile.get
-      assert(requestedColIds.length == readSchema.length,
+      val requestedColIds = requestedColIdsOrEmptyFile.get ++ 
Array.fill(partitionSchema.length)(-1)
+      assert(requestedColIds.length == resultSchema.length,
         "[BUG] requested column IDs do not match required schema")
       val taskConf = new Configuration(conf)
 
@@ -140,15 +140,12 @@ case class OrcPartitionReaderFactory(
 
       val batchReader = new OrcColumnarBatchReader(capacity)
       batchReader.initialize(fileSplit, taskAttemptContext)
-      val columnNameMap = partitionSchema.fields.map(
-        PartitioningUtils.getColName(_, isCaseSensitive)).zipWithIndex.toMap
-      val requestedPartitionColIds = readSchema.fields.map { field =>
-        columnNameMap.getOrElse(PartitioningUtils.getColName(field, 
isCaseSensitive), -1)
-      }
+      val requestedPartitionColIds =
+        Array.fill(readDataSchema.length)(-1) ++ Range(0, 
partitionSchema.length)
 
       batchReader.initBatch(
-        TypeDescription.fromString(readSchemaString),
-        readSchema.fields,
+        TypeDescription.fromString(resultSchemaString),
+        resultSchema.fields,
         requestedColIds,
         requestedPartitionColIds,
         file.partitionValues)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
index fc8a682..dc6b67c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
@@ -32,15 +32,18 @@ case class OrcScan(
     hadoopConf: Configuration,
     fileIndex: PartitioningAwareFileIndex,
     dataSchema: StructType,
-    readSchema: StructType,
+    readDataSchema: StructType,
+    readPartitionSchema: StructType,
     options: CaseInsensitiveStringMap)
-  extends FileScan(sparkSession, fileIndex, readSchema, options) {
+  extends FileScan(sparkSession, fileIndex, readDataSchema, 
readPartitionSchema) {
   override def isSplitable(path: Path): Boolean = true
 
   override def createReaderFactory(): PartitionReaderFactory = {
     val broadcastedConf = sparkSession.sparkContext.broadcast(
       new SerializableConfiguration(hadoopConf))
+    // The partition values are already truncated in `FileScan.partitions`.
+    // We should use `readPartitionSchema` as the partition schema here.
     OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
-      dataSchema, fileIndex.partitionSchema, readSchema)
+      dataSchema, readDataSchema, readPartitionSchema)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
index 8ac56aa..4c1ec52 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
@@ -36,7 +36,7 @@ case class OrcScanBuilder(
     schema: StructType,
     dataSchema: StructType,
     options: CaseInsensitiveStringMap)
-  extends FileScanBuilder(schema) with SupportsPushDownFilters {
+  extends FileScanBuilder(sparkSession, fileIndex, dataSchema) with 
SupportsPushDownFilters {
   lazy val hadoopConf = {
     val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
     // Hadoop Configurations are case sensitive.
@@ -44,7 +44,8 @@ case class OrcScanBuilder(
   }
 
   override def build(): Scan = {
-    OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema, 
options)
+    OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema,
+      readDataSchema(), readPartitionSchema(), options)
   }
 
   private var _pushedFilters: Array[Filter] = Array.empty


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to