This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a37d6e  [SPARK-27132][SQL] Improve file source V2 framework
2a37d6e is described below

commit 2a37d6ed93b96db78d38d9b9fc705dacfc47207b
Author: Gengliang Wang <[email protected]>
AuthorDate: Fri Mar 15 11:58:03 2019 +0800

    [SPARK-27132][SQL] Improve file source V2 framework
    
    ## What changes were proposed in this pull request?
    
    During the migration of CSV V2(https://github.com/apache/spark/pull/24005), 
I find that we can improve the file source v2 framework by:
    1. check duplicated column names in both read and write
    2. Not all the file sources support filter push down. So remove 
`SupportsPushDownFilters` from FileScanBuilder
    3. The method `isSplitable` might require data source options. Add a new 
member `options` to FileScan.
    4. Make `FileTable.schema` a lazy value instead of a method.
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #24066 from gengliangwang/reviseFileSourceV2.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../datasources/FallbackOrcDataSourceV2.scala          |  2 +-
 .../spark/sql/execution/datasources/v2/FileScan.scala  |  4 +++-
 .../sql/execution/datasources/v2/FileScanBuilder.scala |  3 +--
 .../spark/sql/execution/datasources/v2/FileTable.scala | 10 ++++++++--
 .../execution/datasources/v2/FileWriteBuilder.scala    | 18 ++++++++++++++----
 .../sql/execution/datasources/v2/orc/OrcScan.scala     |  5 ++++-
 .../execution/datasources/v2/orc/OrcScanBuilder.scala  |  7 ++++---
 .../spark/sql/test/DataFrameReaderWriterSuite.scala    |  6 ++++++
 8 files changed, 41 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
index 7c72495..d5e89be 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
@@ -40,7 +40,7 @@ class FallbackOrcDataSourceV2(sparkSession: SparkSession) 
extends Rule[LogicalPl
       val relation = HadoopFsRelation(
         table.fileIndex,
         table.fileIndex.partitionSchema,
-        table.schema(),
+        table.schema,
         None,
         v1FileFormat,
         d.options.asScala.toMap)(sparkSession)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
index bdd6a48..6ab5c4b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
@@ -23,11 +23,13 @@ import org.apache.spark.sql.execution.PartitionedFileUtil
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan}
 import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 abstract class FileScan(
     sparkSession: SparkSession,
     fileIndex: PartitioningAwareFileIndex,
-    readSchema: StructType) extends Scan with Batch {
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap) extends Scan with Batch {
   /**
    * Returns whether a file with `path` could be split or not.
    */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
index 5dd343b..d4e55a5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
@@ -21,8 +21,7 @@ import org.apache.spark.sql.types.StructType
 
 abstract class FileScanBuilder(schema: StructType)
   extends ScanBuilder
-  with SupportsPushDownRequiredColumns
-  with SupportsPushDownFilters {
+  with SupportsPushDownRequiredColumns {
   protected var readSchema = schema
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index 21fb6fd..9423fe9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, 
Table}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.util.SchemaUtils
 
 abstract class FileTable(
     sparkSession: SparkSession,
@@ -50,10 +51,15 @@ abstract class FileTable(
       s"Unable to infer schema for $name. It must be specified manually.")
   }.asNullable
 
-  override def schema(): StructType = {
+  override lazy val schema: StructType = {
     val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+    SchemaUtils.checkColumnNameDuplication(dataSchema.fieldNames,
+      "in the data schema", caseSensitive)
+    val partitionSchema = fileIndex.partitionSchema
+    SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames,
+      "in the partition schema", caseSensitive)
     PartitioningUtils.mergeDataAndPartitionSchema(dataSchema,
-      fileIndex.partitionSchema, caseSensitive)._1
+      partitionSchema, caseSensitive)._1
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
index e16ee4c..0d07f5a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.v2.writer.{BatchWrite, SupportsSaveMode, 
WriteBuilder}
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.util.SerializableConfiguration
 
 abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: 
Seq[String])
@@ -60,10 +61,11 @@ abstract class FileWriteBuilder(options: 
CaseInsensitiveStringMap, paths: Seq[St
   }
 
   override def buildForBatch(): BatchWrite = {
-    validateInputs()
-    val path = new Path(paths.head)
     val sparkSession = SparkSession.active
+    validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis)
+    val path = new Path(paths.head)
     val optionsAsScala = options.asScala.toMap
+
     val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala)
     val job = getJobInstance(hadoopConf, path)
     val committer = FileCommitProtocol.instantiate(
@@ -122,12 +124,20 @@ abstract class FileWriteBuilder(options: 
CaseInsensitiveStringMap, paths: Seq[St
    */
   def formatName: String
 
-  private def validateInputs(): Unit = {
+  private def validateInputs(caseSensitiveAnalysis: Boolean): Unit = {
     assert(schema != null, "Missing input data schema")
     assert(queryId != null, "Missing query ID")
     assert(mode != null, "Missing save mode")
-    assert(paths.length == 1)
+
+    if (paths.length != 1) {
+      throw new IllegalArgumentException("Expected exactly one path to be 
specified, but " +
+        s"got: ${paths.mkString(", ")}")
+    }
+    val pathName = paths.head
+    SchemaUtils.checkColumnNameDuplication(schema.fields.map(_.name),
+      s"when inserting into $pathName", caseSensitiveAnalysis)
     DataSource.validateSchema(schema)
+
     schema.foreach { field =>
       if (!supportsDataType(field.dataType)) {
         throw new AnalysisException(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
index 3c5dc1f..237eadb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
@@ -24,6 +24,7 @@ import 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
 import org.apache.spark.sql.execution.datasources.v2.FileScan
 import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
 import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.SerializableConfiguration
 
 case class OrcScan(
@@ -31,7 +32,9 @@ case class OrcScan(
     hadoopConf: Configuration,
     fileIndex: PartitioningAwareFileIndex,
     dataSchema: StructType,
-    readSchema: StructType) extends FileScan(sparkSession, fileIndex, 
readSchema) {
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap)
+  extends FileScan(sparkSession, fileIndex, readSchema, options) {
   override def isSplitable(path: Path): Boolean = true
 
   override def createReaderFactory(): PartitionReaderFactory = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
index 0b15341..4767f21 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
@@ -26,7 +26,7 @@ import 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
 import org.apache.spark.sql.execution.datasources.orc.OrcFilters
 import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.sources.v2.reader.Scan
+import org.apache.spark.sql.sources.v2.reader.{Scan, SupportsPushDownFilters}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -35,11 +35,12 @@ case class OrcScanBuilder(
     fileIndex: PartitioningAwareFileIndex,
     schema: StructType,
     dataSchema: StructType,
-    options: CaseInsensitiveStringMap) extends FileScanBuilder(schema) {
+    options: CaseInsensitiveStringMap)
+  extends FileScanBuilder(schema) with SupportsPushDownFilters {
   lazy val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)
 
   override def build(): Scan = {
-    OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema)
+    OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema, 
options)
   }
 
   private var _pushedFilters: Array[Filter] = Array.empty
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index e45ab19..9f96947 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -838,6 +838,12 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSQLContext with Be
           checkReadUserSpecifiedDataColumnDuplication(
             Seq((1, 1)).toDF("c0", "c1"), "parquet", c0, c1, src)
           checkReadPartitionColumnDuplication("parquet", c0, c1, src)
+
+          // Check ORC format
+          checkWriteDataColumnDuplication("orc", c0, c1, src)
+          checkReadUserSpecifiedDataColumnDuplication(
+            Seq((1, 1)).toDF("c0", "c1"), "orc", c0, c1, src)
+          checkReadPartitionColumnDuplication("orc", c0, c1, src)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to