This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2a37d6e [SPARK-27132][SQL] Improve file source V2 framework
2a37d6e is described below
commit 2a37d6ed93b96db78d38d9b9fc705dacfc47207b
Author: Gengliang Wang <[email protected]>
AuthorDate: Fri Mar 15 11:58:03 2019 +0800
[SPARK-27132][SQL] Improve file source V2 framework
## What changes were proposed in this pull request?
During the migration of CSV V2(https://github.com/apache/spark/pull/24005),
I find that we can improve the file source v2 framework by:
1. check duplicated column names in both read and write
2. Not all the file sources support filter push down. So remove
`SupportsPushDownFilters` from FileScanBuilder
3. The method `isSplitable` might require data source options. Add a new
member `options` to FileScan.
4. Make `FileTable.schema` a lazy value instead of a method.
## How was this patch tested?
Unit test
Closes #24066 from gengliangwang/reviseFileSourceV2.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../datasources/FallbackOrcDataSourceV2.scala | 2 +-
.../spark/sql/execution/datasources/v2/FileScan.scala | 4 +++-
.../sql/execution/datasources/v2/FileScanBuilder.scala | 3 +--
.../spark/sql/execution/datasources/v2/FileTable.scala | 10 ++++++++--
.../execution/datasources/v2/FileWriteBuilder.scala | 18 ++++++++++++++----
.../sql/execution/datasources/v2/orc/OrcScan.scala | 5 ++++-
.../execution/datasources/v2/orc/OrcScanBuilder.scala | 7 ++++---
.../spark/sql/test/DataFrameReaderWriterSuite.scala | 6 ++++++
8 files changed, 41 insertions(+), 14 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
index 7c72495..d5e89be 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
@@ -40,7 +40,7 @@ class FallbackOrcDataSourceV2(sparkSession: SparkSession)
extends Rule[LogicalPl
val relation = HadoopFsRelation(
table.fileIndex,
table.fileIndex.partitionSchema,
- table.schema(),
+ table.schema,
None,
v1FileFormat,
d.options.asScala.toMap)(sparkSession)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
index bdd6a48..6ab5c4b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
@@ -23,11 +23,13 @@ import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan}
import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
abstract class FileScan(
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
- readSchema: StructType) extends Scan with Batch {
+ readSchema: StructType,
+ options: CaseInsensitiveStringMap) extends Scan with Batch {
/**
* Returns whether a file with `path` could be split or not.
*/
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
index 5dd343b..d4e55a5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
@@ -21,8 +21,7 @@ import org.apache.spark.sql.types.StructType
abstract class FileScanBuilder(schema: StructType)
extends ScanBuilder
- with SupportsPushDownRequiredColumns
- with SupportsPushDownFilters {
+ with SupportsPushDownRequiredColumns {
protected var readSchema = schema
override def pruneColumns(requiredSchema: StructType): Unit = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index 21fb6fd..9423fe9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite,
Table}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.util.SchemaUtils
abstract class FileTable(
sparkSession: SparkSession,
@@ -50,10 +51,15 @@ abstract class FileTable(
s"Unable to infer schema for $name. It must be specified manually.")
}.asNullable
- override def schema(): StructType = {
+ override lazy val schema: StructType = {
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+ SchemaUtils.checkColumnNameDuplication(dataSchema.fieldNames,
+ "in the data schema", caseSensitive)
+ val partitionSchema = fileIndex.partitionSchema
+ SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames,
+ "in the partition schema", caseSensitive)
PartitioningUtils.mergeDataAndPartitionSchema(dataSchema,
- fileIndex.partitionSchema, caseSensitive)._1
+ partitionSchema, caseSensitive)._1
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
index e16ee4c..0d07f5a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, SupportsSaveMode,
WriteBuilder}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.SerializableConfiguration
abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths:
Seq[String])
@@ -60,10 +61,11 @@ abstract class FileWriteBuilder(options:
CaseInsensitiveStringMap, paths: Seq[St
}
override def buildForBatch(): BatchWrite = {
- validateInputs()
- val path = new Path(paths.head)
val sparkSession = SparkSession.active
+ validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis)
+ val path = new Path(paths.head)
val optionsAsScala = options.asScala.toMap
+
val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala)
val job = getJobInstance(hadoopConf, path)
val committer = FileCommitProtocol.instantiate(
@@ -122,12 +124,20 @@ abstract class FileWriteBuilder(options:
CaseInsensitiveStringMap, paths: Seq[St
*/
def formatName: String
- private def validateInputs(): Unit = {
+ private def validateInputs(caseSensitiveAnalysis: Boolean): Unit = {
assert(schema != null, "Missing input data schema")
assert(queryId != null, "Missing query ID")
assert(mode != null, "Missing save mode")
- assert(paths.length == 1)
+
+ if (paths.length != 1) {
+ throw new IllegalArgumentException("Expected exactly one path to be
specified, but " +
+ s"got: ${paths.mkString(", ")}")
+ }
+ val pathName = paths.head
+ SchemaUtils.checkColumnNameDuplication(schema.fields.map(_.name),
+ s"when inserting into $pathName", caseSensitiveAnalysis)
DataSource.validateSchema(schema)
+
schema.foreach { field =>
if (!supportsDataType(field.dataType)) {
throw new AnalysisException(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
index 3c5dc1f..237eadb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
@@ -24,6 +24,7 @@ import
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration
case class OrcScan(
@@ -31,7 +32,9 @@ case class OrcScan(
hadoopConf: Configuration,
fileIndex: PartitioningAwareFileIndex,
dataSchema: StructType,
- readSchema: StructType) extends FileScan(sparkSession, fileIndex,
readSchema) {
+ readSchema: StructType,
+ options: CaseInsensitiveStringMap)
+ extends FileScan(sparkSession, fileIndex, readSchema, options) {
override def isSplitable(path: Path): Boolean = true
override def createReaderFactory(): PartitionReaderFactory = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
index 0b15341..4767f21 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
@@ -26,7 +26,7 @@ import
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.orc.OrcFilters
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.sources.v2.reader.Scan
+import org.apache.spark.sql.sources.v2.reader.{Scan, SupportsPushDownFilters}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -35,11 +35,12 @@ case class OrcScanBuilder(
fileIndex: PartitioningAwareFileIndex,
schema: StructType,
dataSchema: StructType,
- options: CaseInsensitiveStringMap) extends FileScanBuilder(schema) {
+ options: CaseInsensitiveStringMap)
+ extends FileScanBuilder(schema) with SupportsPushDownFilters {
lazy val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)
override def build(): Scan = {
- OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema)
+ OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema,
options)
}
private var _pushedFilters: Array[Filter] = Array.empty
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index e45ab19..9f96947 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -838,6 +838,12 @@ class DataFrameReaderWriterSuite extends QueryTest with
SharedSQLContext with Be
checkReadUserSpecifiedDataColumnDuplication(
Seq((1, 1)).toDF("c0", "c1"), "parquet", c0, c1, src)
checkReadPartitionColumnDuplication("parquet", c0, c1, src)
+
+ // Check ORC format
+ checkWriteDataColumnDuplication("orc", c0, c1, src)
+ checkReadUserSpecifiedDataColumnDuplication(
+ Seq((1, 1)).toDF("c0", "c1"), "orc", c0, c1, src)
+ checkReadPartitionColumnDuplication("orc", c0, c1, src)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]