Repository: spark
Updated Branches:
  refs/heads/master 9f558601e -> ce084d3e0


[SPARK-24990][SQL] merge ReadSupport and ReadSupportWithSchema

## What changes were proposed in this pull request?

Regarding user-specified schema, data sources may have 3 different behaviors:
1. must have a user-specified schema
2. can't have a user-specified schema
3. can accept the user-specified if it's given, or infer the schema.

I added `ReadSupportWithSchema` to support these behaviors, following data 
source v1. But it turns out we don't need this extra interface. We can just add 
a `createReader(schema, options)` to `ReadSupport` and make it call 
`createReader(options)` by default.

TODO: also fix the streaming API in followup PRs.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenc...@databricks.com>

Closes #21946 from cloud-fan/ds-schema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce084d3e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce084d3e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce084d3e

Branch: refs/heads/master
Commit: ce084d3e06b14897174426665dada0464260da89
Parents: 9f55860
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Wed Aug 1 15:57:54 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Wed Aug 1 15:57:54 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/sources/v2/ReadSupport.java       | 25 ++++++++++
 .../sql/sources/v2/ReadSupportWithSchema.java   | 49 --------------------
 .../sql/sources/v2/reader/DataSourceReader.java |  3 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |  4 +-
 .../datasources/v2/DataSourceV2Relation.scala   | 20 +-------
 .../v2/JavaSchemaRequiredDataSource.java        |  9 +++-
 .../sql/sources/v2/DataSourceV2Suite.scala      | 16 ++++---
 7 files changed, 47 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
index b2526de..80ac08e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.sources.v2;
 
 import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.DataSourceRegister;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.types.StructType;
 
 /**
  * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
@@ -33,6 +35,29 @@ public interface ReadSupport extends DataSourceV2 {
    * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
    * submitted.
    *
+   * @param schema the user specified schema.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   *
+   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
+   * override this method to handle user specified schema.
+   */
+  default DataSourceReader createReader(StructType schema, DataSourceOptions 
options) {
+    String name;
+    if (this instanceof DataSourceRegister) {
+      name = ((DataSourceRegister) this).shortName();
+    } else {
+      name = this.getClass().getName();
+    }
+    throw new UnsupportedOperationException(name + " does not support user 
specified schema");
+  }
+
+  /**
+   * Creates a {@link DataSourceReader} to scan the data from this data source.
+   *
+   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
+   * submitted.
+   *
    * @param options the options for the returned data source reader, which is 
an immutable
    *                case-insensitive string-to-string map.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
deleted file mode 100644
index f316599..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data reading ability and scan the data from the data source.
- *
- * This is a variant of {@link ReadSupport} that accepts user-specified schema 
when reading data.
- * A data source can implement both {@link ReadSupport} and {@link 
ReadSupportWithSchema} if it
- * supports both schema inference and user-specified schema.
- */
-@InterfaceStability.Evolving
-public interface ReadSupportWithSchema extends DataSourceV2 {
-
-  /**
-   * Create a {@link DataSourceReader} to scan the data from this data source.
-   *
-   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
-   * submitted.
-   *
-   * @param schema the full schema of this data source reader. Full schema 
usually maps to the
-   *               physical schema of the underlying storage of this data 
source reader, e.g.
-   *               CSV files, JSON files, etc, while this reader may not read 
data with full
-   *               schema, as column pruning or other optimizations may happen.
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  DataSourceReader createReader(StructType schema, DataSourceOptions options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
index 4a74620..da98fab 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
@@ -23,13 +23,12 @@ import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
 import org.apache.spark.sql.types.StructType;
 
 /**
  * A data source reader that is returned by
  * {@link ReadSupport#createReader(DataSourceOptions)} or
- * {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}.
+ * {@link ReadSupport#createReader(StructType, DataSourceOptions)}.
  * It can mix in various query optimization interfaces to speed up the data 
scan. The actual scan
  * logic is delegated to {@link InputPartition}s, which are returned by
  * {@link #planInputPartitions()}.

http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index ec9352a..9bd1134 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport}
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -194,7 +194,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
     val cls = DataSource.lookupDataSource(source, 
sparkSession.sessionState.conf)
     if (classOf[DataSourceV2].isAssignableFrom(cls)) {
       val ds = cls.newInstance().asInstanceOf[DataSourceV2]
-      if (ds.isInstanceOf[ReadSupport] || 
ds.isInstanceOf[ReadSupportWithSchema]) {
+      if (ds.isInstanceOf[ReadSupport]) {
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           ds = ds, conf = sparkSession.sessionState.conf)
         val pathsOption = {

http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 7613eb2..4616692 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport}
 import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsReportStatistics}
 import org.apache.spark.sql.types.StructType
 
@@ -110,22 +110,6 @@ object DataSourceV2Relation {
       source match {
         case support: ReadSupport =>
           support
-        case _: ReadSupportWithSchema =>
-          // this method is only called if there is no user-supplied schema. 
if there is no
-          // user-supplied schema and ReadSupport was not implemented, throw a 
helpful exception.
-          throw new AnalysisException(s"Data source requires a user-supplied 
schema: $name")
-        case _ =>
-          throw new AnalysisException(s"Data source is not readable: $name")
-      }
-    }
-
-    def asReadSupportWithSchema: ReadSupportWithSchema = {
-      source match {
-        case support: ReadSupportWithSchema =>
-          support
-        case _: ReadSupport =>
-          throw new AnalysisException(
-            s"Data source does not support user-supplied schema: $name")
         case _ =>
           throw new AnalysisException(s"Data source is not readable: $name")
       }
@@ -146,7 +130,7 @@ object DataSourceV2Relation {
       val v2Options = new DataSourceOptions(options.asJava)
       userSpecifiedSchema match {
         case Some(s) =>
-          asReadSupportWithSchema.createReader(s, v2Options)
+          asReadSupport.createReader(s, v2Options)
         case _ =>
           asReadSupport.createReader(v2Options)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
index ca5abd2..6fd6a44 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
@@ -22,12 +22,12 @@ import java.util.List;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
+import org.apache.spark.sql.sources.v2.ReadSupport;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.types.StructType;
 
-public class JavaSchemaRequiredDataSource implements DataSourceV2, 
ReadSupportWithSchema {
+public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupport 
{
 
   class Reader implements DataSourceReader {
     private final StructType schema;
@@ -48,6 +48,11 @@ public class JavaSchemaRequiredDataSource implements 
DataSourceV2, ReadSupportWi
   }
 
   @Override
+  public DataSourceReader createReader(DataSourceOptions options) {
+    throw new IllegalArgumentException("requires a user-supplied schema");
+  }
+
+  @Override
   public DataSourceReader createReader(StructType schema, DataSourceOptions 
options) {
     return new Reader(schema);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ce084d3e/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index c7da137..b6e594d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -22,9 +22,8 @@ import java.util.{ArrayList, List => JList}
 import test.org.apache.spark.sql.sources.v2._
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanExec}
 import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
@@ -135,8 +134,8 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
   test("schema required data source") {
     Seq(classOf[SchemaRequiredDataSource], 
classOf[JavaSchemaRequiredDataSource]).foreach { cls =>
       withClue(cls.getName) {
-        val e = 
intercept[AnalysisException](spark.read.format(cls.getName).load())
-        assert(e.message.contains("requires a user-supplied schema"))
+        val e = 
intercept[IllegalArgumentException](spark.read.format(cls.getName).load())
+        assert(e.getMessage.contains("requires a user-supplied schema"))
 
         val schema = new StructType().add("i", "int").add("s", "string")
         val df = spark.read.format(cls.getName).schema(schema).load()
@@ -455,15 +454,20 @@ class AdvancedInputPartition(start: Int, end: Int, 
requiredSchema: StructType)
 }
 
 
-class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema 
{
+class SchemaRequiredDataSource extends DataSourceV2 with ReadSupport {
 
   class Reader(val readSchema: StructType) extends DataSourceReader {
     override def planInputPartitions(): JList[InputPartition[InternalRow]] =
       java.util.Collections.emptyList()
   }
 
-  override def createReader(schema: StructType, options: DataSourceOptions): 
DataSourceReader =
+  override def createReader(options: DataSourceOptions): DataSourceReader = {
+    throw new IllegalArgumentException("requires a user-supplied schema")
+  }
+
+  override def createReader(schema: StructType, options: DataSourceOptions): 
DataSourceReader = {
     new Reader(schema)
+  }
 }
 
 class BatchDataSourceV2 extends DataSourceV2 with ReadSupport {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to