Repository: spark Updated Branches: refs/heads/master 9f558601e -> ce084d3e0
[SPARK-24990][SQL] merge ReadSupport and ReadSupportWithSchema ## What changes were proposed in this pull request? Regarding user-specified schema, data sources may have 3 different behaviors: 1. must have a user-specified schema 2. can't have a user-specified schema 3. can accept the user-specified if it's given, or infer the schema. I added `ReadSupportWithSchema` to support these behaviors, following data source v1. But it turns out we don't need this extra interface. We can just add a `createReader(schema, options)` to `ReadSupport` and make it call `createReader(options)` by default. TODO: also fix the streaming API in followup PRs. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenc...@databricks.com> Closes #21946 from cloud-fan/ds-schema. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce084d3e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce084d3e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce084d3e Branch: refs/heads/master Commit: ce084d3e06b14897174426665dada0464260da89 Parents: 9f55860 Author: Wenchen Fan <wenc...@databricks.com> Authored: Wed Aug 1 15:57:54 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Wed Aug 1 15:57:54 2018 -0700 ---------------------------------------------------------------------- .../spark/sql/sources/v2/ReadSupport.java | 25 ++++++++++ .../sql/sources/v2/ReadSupportWithSchema.java | 49 -------------------- .../sql/sources/v2/reader/DataSourceReader.java | 3 +- .../org/apache/spark/sql/DataFrameReader.scala | 4 +- .../datasources/v2/DataSourceV2Relation.scala | 20 +------- .../v2/JavaSchemaRequiredDataSource.java | 9 +++- .../sql/sources/v2/DataSourceV2Suite.scala | 16 ++++--- 7 files changed, 47 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java index b2526de..80ac08e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java @@ -18,7 +18,9 @@ package org.apache.spark.sql.sources.v2; import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.DataSourceRegister; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; +import org.apache.spark.sql.types.StructType; /** * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to @@ -33,6 +35,29 @@ public interface ReadSupport extends DataSourceV2 { * If this method fails (by throwing an exception), the action will fail and no Spark job will be * submitted. * + * @param schema the user specified schema. + * @param options the options for the returned data source reader, which is an immutable + * case-insensitive string-to-string map. + * + * By default this method throws {@link UnsupportedOperationException}, implementations should + * override this method to handle user specified schema. + */ + default DataSourceReader createReader(StructType schema, DataSourceOptions options) { + String name; + if (this instanceof DataSourceRegister) { + name = ((DataSourceRegister) this).shortName(); + } else { + name = this.getClass().getName(); + } + throw new UnsupportedOperationException(name + " does not support user specified schema"); + } + + /** + * Creates a {@link DataSourceReader} to scan the data from this data source. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + * * @param options the options for the returned data source reader, which is an immutable * case-insensitive string-to-string map. */ http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java deleted file mode 100644 index f316599..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data reading ability and scan the data from the data source. - * - * This is a variant of {@link ReadSupport} that accepts user-specified schema when reading data. - * A data source can implement both {@link ReadSupport} and {@link ReadSupportWithSchema} if it - * supports both schema inference and user-specified schema. - */ -@InterfaceStability.Evolving -public interface ReadSupportWithSchema extends DataSourceV2 { - - /** - * Create a {@link DataSourceReader} to scan the data from this data source. - * - * If this method fails (by throwing an exception), the action will fail and no Spark job will be - * submitted. - * - * @param schema the full schema of this data source reader. Full schema usually maps to the - * physical schema of the underlying storage of this data source reader, e.g. - * CSV files, JSON files, etc, while this reader may not read data with full - * schema, as column pruning or other optimizations may happen. - * @param options the options for the returned data source reader, which is an immutable - * case-insensitive string-to-string map. - */ - DataSourceReader createReader(StructType schema, DataSourceOptions options); -} http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java index 4a74620..da98fab 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java @@ -23,13 +23,12 @@ import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.ReadSupport; -import org.apache.spark.sql.sources.v2.ReadSupportWithSchema; import org.apache.spark.sql.types.StructType; /** * A data source reader that is returned by * {@link ReadSupport#createReader(DataSourceOptions)} or - * {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}. + * {@link ReadSupport#createReader(StructType, DataSourceOptions)}. * It can mix in various query optimization interfaces to speed up the data scan. The actual scan * logic is delegated to {@link InputPartition}s, which are returned by * {@link #planInputPartitions()}. http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ec9352a..9bd1134 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils -import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.unsafe.types.UTF8String @@ -194,7 +194,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { val ds = cls.newInstance().asInstanceOf[DataSourceV2] - if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) { + if (ds.isInstanceOf[ReadSupport]) { val sessionOptions = DataSourceV2Utils.extractSessionConfigs( ds = ds, conf = sparkSession.sessionState.conf) val pathsOption = { http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 7613eb2..4616692 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport} import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics} import org.apache.spark.sql.types.StructType @@ -110,22 +110,6 @@ object DataSourceV2Relation { source match { case support: ReadSupport => support - case _: ReadSupportWithSchema => - // this method is only called if there is no user-supplied schema. if there is no - // user-supplied schema and ReadSupport was not implemented, throw a helpful exception. - throw new AnalysisException(s"Data source requires a user-supplied schema: $name") - case _ => - throw new AnalysisException(s"Data source is not readable: $name") - } - } - - def asReadSupportWithSchema: ReadSupportWithSchema = { - source match { - case support: ReadSupportWithSchema => - support - case _: ReadSupport => - throw new AnalysisException( - s"Data source does not support user-supplied schema: $name") case _ => throw new AnalysisException(s"Data source is not readable: $name") } @@ -146,7 +130,7 @@ object DataSourceV2Relation { val v2Options = new DataSourceOptions(options.asJava) userSpecifiedSchema match { case Some(s) => - asReadSupportWithSchema.createReader(s, v2Options) + asReadSupport.createReader(s, v2Options) case _ => asReadSupport.createReader(v2Options) } http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java index ca5abd2..6fd6a44 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java @@ -22,12 +22,12 @@ import java.util.List; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.ReadSupportWithSchema; +import org.apache.spark.sql.sources.v2.ReadSupport; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.sources.v2.reader.InputPartition; import org.apache.spark.sql.types.StructType; -public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema { +public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupport { class Reader implements DataSourceReader { private final StructType schema; @@ -48,6 +48,11 @@ public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWi } @Override + public DataSourceReader createReader(DataSourceOptions options) { + throw new IllegalArgumentException("requires a user-supplied schema"); + } + + @Override public DataSourceReader createReader(StructType schema, DataSourceOptions options) { return new Reader(schema); } http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index c7da137..b6e594d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -22,9 +22,8 @@ import java.util.{ArrayList, List => JList} import test.org.apache.spark.sql.sources.v2._ import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanExec} import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector @@ -135,8 +134,8 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { test("schema required data source") { Seq(classOf[SchemaRequiredDataSource], classOf[JavaSchemaRequiredDataSource]).foreach { cls => withClue(cls.getName) { - val e = intercept[AnalysisException](spark.read.format(cls.getName).load()) - assert(e.message.contains("requires a user-supplied schema")) + val e = intercept[IllegalArgumentException](spark.read.format(cls.getName).load()) + assert(e.getMessage.contains("requires a user-supplied schema")) val schema = new StructType().add("i", "int").add("s", "string") val df = spark.read.format(cls.getName).schema(schema).load() @@ -455,15 +454,20 @@ class AdvancedInputPartition(start: Int, end: Int, requiredSchema: StructType) } -class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema { +class SchemaRequiredDataSource extends DataSourceV2 with ReadSupport { class Reader(val readSchema: StructType) extends DataSourceReader { override def planInputPartitions(): JList[InputPartition[InternalRow]] = java.util.Collections.emptyList() } - override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader = + override def createReader(options: DataSourceOptions): DataSourceReader = { + throw new IllegalArgumentException("requires a user-supplied schema") + } + + override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader = { new Reader(schema) + } } class BatchDataSourceV2 extends DataSourceV2 with ReadSupport { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org