This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e76c6c9de083 [SPARK-49414][CONNECT][SQL] Add Shared DataFrameReader
interface
e76c6c9de083 is described below
commit e76c6c9de0834ebda0cc56e55b673febc15a96e5
Author: Herman van Hovell <[email protected]>
AuthorDate: Wed Sep 4 22:11:18 2024 -0400
[SPARK-49414][CONNECT][SQL] Add Shared DataFrameReader interface
### What changes were proposed in this pull request?
This PR creates a shared interface for DataFrameReader.
### Why are the changes needed?
We are creating a shared Scala Spark SQL interface for Classic and Connect.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47975 from hvanhovell/SPARK-49414.
Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../org/apache/spark/sql/DataFrameReader.scala | 566 +++------------------
.../scala/org/apache/spark/sql/SparkSession.scala | 11 +-
.../CheckConnectJvmClientCompatibility.scala | 8 +
project/MimaExcludes.scala | 20 +
.../apache/spark/sql/api}/DataFrameReader.scala | 359 +++----------
.../org/apache/spark/sql/api/SparkSession.scala | 12 +
.../org/apache/spark/sql/DataFrameReader.scala | 544 +++-----------------
.../scala/org/apache/spark/sql/SparkSession.scala | 11 +-
8 files changed, 261 insertions(+), 1270 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1ad98dc91b21..c3ee7030424e 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -23,11 +23,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.annotation.Stable
import org.apache.spark.connect.proto.Parse.ParseFormat
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
SparkCharVarcharUtils}
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
-import org.apache.spark.sql.errors.DataTypeErrors
import org.apache.spark.sql.types.StructType
/**
@@ -37,144 +33,44 @@ import org.apache.spark.sql.types.StructType
* @since 3.4.0
*/
@Stable
-class DataFrameReader private[sql] (sparkSession: SparkSession) extends
Logging {
-
- /**
- * Specifies the input data source format.
- *
- * @since 3.4.0
- */
- def format(source: String): DataFrameReader = {
- this.source = source
- this
- }
+class DataFrameReader private[sql] (sparkSession: SparkSession)
+ extends api.DataFrameReader[Dataset] {
- /**
- * Specifies the input schema. Some data sources (e.g. JSON) can infer the
input schema
- * automatically from data. By specifying the schema here, the underlying
data source can skip
- * the schema inference step, and thus speed up data loading.
- *
- * @since 3.4.0
- */
- def schema(schema: StructType): DataFrameReader = {
- if (schema != null) {
- val replaced =
SparkCharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
- this.userSpecifiedSchema = Option(replaced)
- }
- this
- }
+ /** @inheritdoc */
+ override def format(source: String): this.type = super.format(source)
- /**
- * Specifies the schema by using the input DDL-formatted string. Some data
sources (e.g. JSON)
- * can infer the input schema automatically from data. By specifying the
schema here, the
- * underlying data source can skip the schema inference step, and thus speed
up data loading.
- *
- * {{{
- * spark.read.schema("a INT, b STRING, c DOUBLE").csv("test.csv")
- * }}}
- *
- * @since 3.4.0
- */
- def schema(schemaString: String): DataFrameReader = {
- schema(StructType.fromDDL(schemaString))
- }
+ /** @inheritdoc */
+ override def schema(schema: StructType): this.type = super.schema(schema)
- /**
- * Adds an input option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names. If a new option
- * has the same key case-insensitively, it will override the existing option.
- *
- * @since 3.4.0
- */
- def option(key: String, value: String): DataFrameReader = {
- this.extraOptions = this.extraOptions + (key -> value)
- this
- }
+ /** @inheritdoc */
+ override def schema(schemaString: String): this.type =
super.schema(schemaString)
- /**
- * Adds an input option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names. If a new option
- * has the same key case-insensitively, it will override the existing option.
- *
- * @since 3.4.0
- */
- def option(key: String, value: Boolean): DataFrameReader = option(key,
value.toString)
-
- /**
- * Adds an input option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names. If a new option
- * has the same key case-insensitively, it will override the existing option.
- *
- * @since 3.4.0
- */
- def option(key: String, value: Long): DataFrameReader = option(key,
value.toString)
-
- /**
- * Adds an input option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names. If a new option
- * has the same key case-insensitively, it will override the existing option.
- *
- * @since 3.4.0
- */
- def option(key: String, value: Double): DataFrameReader = option(key,
value.toString)
-
- /**
- * (Scala-specific) Adds input options for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names. If a new option
- * has the same key case-insensitively, it will override the existing option.
- *
- * @since 3.4.0
- */
- def options(options: scala.collection.Map[String, String]): DataFrameReader
= {
- this.extraOptions ++= options
- this
- }
+ /** @inheritdoc */
+ override def option(key: String, value: String): this.type =
super.option(key, value)
- /**
- * Adds input options for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names. If a new option
- * has the same key case-insensitively, it will override the existing option.
- *
- * @since 3.4.0
- */
- def options(options: java.util.Map[String, String]): DataFrameReader = {
- this.options(options.asScala)
- this
- }
+ /** @inheritdoc */
+ override def option(key: String, value: Boolean): this.type =
super.option(key, value)
- /**
- * Loads input in as a `DataFrame`, for data sources that don't require a
path (e.g. external
- * key-value stores).
- *
- * @since 3.4.0
- */
- def load(): DataFrame = {
- load(Seq.empty: _*) // force invocation of `load(...varargs...)`
- }
+ /** @inheritdoc */
+ override def option(key: String, value: Long): this.type = super.option(key,
value)
- /**
- * Loads input in as a `DataFrame`, for data sources that require a path
(e.g. data backed by a
- * local or distributed file system).
- *
- * @since 3.4.0
- */
- def load(path: String): DataFrame = {
- // force invocation of `load(...varargs...)`
- load(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def option(key: String, value: Double): this.type =
super.option(key, value)
+
+ /** @inheritdoc */
+ override def options(options: scala.collection.Map[String, String]):
this.type =
+ super.options(options)
- /**
- * Loads input in as a `DataFrame`, for data sources that support multiple
paths. Only works if
- * the source is a HadoopFsRelationProvider.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
+ override def options(options: java.util.Map[String, String]): this.type =
super.options(options)
+
+ /** @inheritdoc */
+ override def load(): DataFrame = load(Nil: _*)
+
+ /** @inheritdoc */
+ def load(path: String): DataFrame = load(Seq(path): _*)
+
+ /** @inheritdoc */
@scala.annotation.varargs
def load(paths: String*): DataFrame = {
sparkSession.newDataFrame { builder =>
@@ -190,93 +86,29 @@ class DataFrameReader private[sql] (sparkSession:
SparkSession) extends Logging
}
}
- /**
- * Construct a `DataFrame` representing the database table accessible via
JDBC URL url named
- * table and connection properties.
- *
- * You can find the JDBC-specific option and parameter documentation for
reading tables via JDBC
- * in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 3.4.0
- */
- def jdbc(url: String, table: String, properties: Properties): DataFrame = {
- // properties should override settings in extraOptions.
- this.extraOptions ++= properties.asScala
- // explicit url and dbtable should override all
- this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
- format("jdbc").load()
- }
+ /** @inheritdoc */
+ override def jdbc(url: String, table: String, properties: Properties):
DataFrame =
+ super.jdbc(url, table, properties)
- // scalastyle:off line.size.limit
- /**
- * Construct a `DataFrame` representing the database table accessible via
JDBC URL url named
- * table. Partitions of the table will be retrieved in parallel based on the
parameters passed
- * to this function.
- *
- * Don't create too many partitions in parallel on a large cluster;
otherwise Spark might crash
- * your external database systems.
- *
- * You can find the JDBC-specific option and parameter documentation for
reading tables via JDBC
- * in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @param table
- * Name of the table in the external database.
- * @param columnName
- * Alias of `partitionColumn` option. Refer to `partitionColumn` in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
- * Data Source Option</a> in the version you use.
- * @param connectionProperties
- * JDBC database connection arguments, a list of arbitrary string
tag/value. Normally at least
- * a "user" and "password" property should be included. "fetchsize" can be
used to control the
- * number of rows per fetch and "queryTimeout" can be used to wait for a
Statement object to
- * execute to the given number of seconds.
- * @since 3.4.0
- */
- // scalastyle:on line.size.limit
- def jdbc(
+ /** @inheritdoc */
+ override def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
- connectionProperties: Properties): DataFrame = {
- // columnName, lowerBound, upperBound and numPartitions override settings
in extraOptions.
- this.extraOptions ++= Map(
- "partitionColumn" -> columnName,
- "lowerBound" -> lowerBound.toString,
- "upperBound" -> upperBound.toString,
- "numPartitions" -> numPartitions.toString)
- jdbc(url, table, connectionProperties)
- }
-
- /**
- * Construct a `DataFrame` representing the database table accessible via
JDBC URL url named
- * table using connection properties. The `predicates` parameter gives a
list expressions
- * suitable for inclusion in WHERE clauses; each one defines one partition
of the `DataFrame`.
- *
- * Don't create too many partitions in parallel on a large cluster;
otherwise Spark might crash
- * your external database systems.
- *
- * You can find the JDBC-specific option and parameter documentation for
reading tables via JDBC
- * in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @param table
- * Name of the table in the external database.
- * @param predicates
- * Condition in the where clause for each partition.
- * @param connectionProperties
- * JDBC database connection arguments, a list of arbitrary string
tag/value. Normally at least
- * a "user" and "password" property should be included. "fetchsize" can be
used to control the
- * number of rows per fetch.
- * @since 3.4.0
- */
+ connectionProperties: Properties): DataFrame =
+ super.jdbc(
+ url,
+ table,
+ columnName,
+ lowerBound,
+ upperBound,
+ numPartitions,
+ connectionProperties)
+
+ /** @inheritdoc */
def jdbc(
url: String,
table: String,
@@ -296,207 +128,56 @@ class DataFrameReader private[sql] (sparkSession:
SparkSession) extends Logging
}
}
- /**
- * Loads a JSON file and returns the results as a `DataFrame`.
- *
- * See the documentation on the overloaded `json()` method with varargs for
more details.
- *
- * @since 3.4.0
- */
- def json(path: String): DataFrame = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- json(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def json(path: String): DataFrame = super.json(path)
- /**
- * Loads JSON files and returns the results as a `DataFrame`.
- *
- * <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON)
is supported by
- * default. For JSON (one record per file), set the `multiLine` option to
true.
- *
- * This function goes through the input once to determine the input schema.
If you know the
- * schema in advance, use the version that specifies the schema to avoid the
extra scan.
- *
- * You can find the JSON-specific options for reading JSON files in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def json(paths: String*): DataFrame = {
- format("json").load(paths: _*)
- }
+ override def json(paths: String*): DataFrame = super.json(paths: _*)
- /**
- * Loads a `Dataset[String]` storing JSON objects (<a
href="http://jsonlines.org/">JSON Lines
- * text format or newline-delimited JSON</a>) and returns the result as a
`DataFrame`.
- *
- * Unless the schema is specified using `schema` function, this function
goes through the input
- * once to determine the input schema.
- *
- * @param jsonDataset
- * input Dataset with one JSON object per record
- * @since 3.4.0
- */
+ /** @inheritdoc */
def json(jsonDataset: Dataset[String]): DataFrame =
parse(jsonDataset, ParseFormat.PARSE_FORMAT_JSON)
- /**
- * Loads a CSV file and returns the result as a `DataFrame`. See the
documentation on the other
- * overloaded `csv()` method for more details.
- *
- * @since 3.4.0
- */
- def csv(path: String): DataFrame = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- csv(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def csv(path: String): DataFrame = super.csv(path)
- /**
- * Loads CSV files and returns the result as a `DataFrame`.
- *
- * This function will go through the input once to determine the input
schema if `inferSchema`
- * is enabled. To avoid going through the entire data once, disable
`inferSchema` option or
- * specify the schema explicitly using `schema`.
- *
- * You can find the CSV-specific options for reading CSV files in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def csv(paths: String*): DataFrame = format("csv").load(paths: _*)
-
- /**
- * Loads an `Dataset[String]` storing CSV rows and returns the result as a
`DataFrame`.
- *
- * If the schema is not specified using `schema` function and `inferSchema`
option is enabled,
- * this function goes through the input once to determine the input schema.
- *
- * If the schema is not specified using `schema` function and `inferSchema`
option is disabled,
- * it determines the columns as string types and it reads only the first
line to determine the
- * names and the number of fields.
- *
- * If the enforceSchema is set to `false`, only the CSV header in the first
line is checked to
- * conform specified or inferred schema.
- *
- * @note
- * if `header` option is set to `true` when calling this API, all lines
same with the header
- * will be removed if exists.
- * @param csvDataset
- * input Dataset with one CSV row per record
- * @since 3.4.0
- */
+ override def csv(paths: String*): DataFrame = super.csv(paths: _*)
+
+ /** @inheritdoc */
def csv(csvDataset: Dataset[String]): DataFrame =
parse(csvDataset, ParseFormat.PARSE_FORMAT_CSV)
- /**
- * Loads a XML file and returns the result as a `DataFrame`. See the
documentation on the other
- * overloaded `xml()` method for more details.
- *
- * @since 4.0.0
- */
- def xml(path: String): DataFrame = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- xml(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def xml(path: String): DataFrame = super.xml(path)
- /**
- * Loads XML files and returns the result as a `DataFrame`.
- *
- * This function will go through the input once to determine the input
schema if `inferSchema`
- * is enabled. To avoid going through the entire data once, disable
`inferSchema` option or
- * specify the schema explicitly using `schema`.
- *
- * You can find the XML-specific options for reading XML files in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 4.0.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def xml(paths: String*): DataFrame = format("xml").load(paths: _*)
-
- /**
- * Loads an `Dataset[String]` storing XML object and returns the result as a
`DataFrame`.
- *
- * If the schema is not specified using `schema` function and `inferSchema`
option is enabled,
- * this function goes through the input once to determine the input schema.
- *
- * @param xmlDataset
- * input Dataset with one XML object per record
- * @since 4.0.0
- */
+ override def xml(paths: String*): DataFrame = super.xml(paths: _*)
+
+ /** @inheritdoc */
def xml(xmlDataset: Dataset[String]): DataFrame =
parse(xmlDataset, ParseFormat.PARSE_FORMAT_UNSPECIFIED)
- /**
- * Loads a Parquet file, returning the result as a `DataFrame`. See the
documentation on the
- * other overloaded `parquet()` method for more details.
- *
- * @since 3.4.0
- */
- def parquet(path: String): DataFrame = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- parquet(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def parquet(path: String): DataFrame = super.parquet(path)
- /**
- * Loads a Parquet file, returning the result as a `DataFrame`.
- *
- * Parquet-specific option(s) for reading Parquet files can be found in <a
href=
- *
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option">
Data
- * Source Option</a> in the version you use.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def parquet(paths: String*): DataFrame = {
- format("parquet").load(paths: _*)
- }
+ override def parquet(paths: String*): DataFrame = super.parquet(paths: _*)
- /**
- * Loads an ORC file and returns the result as a `DataFrame`.
- *
- * @param path
- * input path
- * @since 3.4.0
- */
- def orc(path: String): DataFrame = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- orc(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def orc(path: String): DataFrame = super.orc(path)
- /**
- * Loads ORC files and returns the result as a `DataFrame`.
- *
- * ORC-specific option(s) for reading ORC files can be found in <a href=
- *
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option">
Data
- * Source Option</a> in the version you use.
- *
- * @param paths
- * input paths
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def orc(paths: String*): DataFrame = format("orc").load(paths: _*)
-
- /**
- * Returns the specified table/view as a `DataFrame`. If it's a table, it
must support batch
- * reading and the returned DataFrame is the batch scan query plan of this
table. If it's a
- * view, the returned DataFrame is simply the query plan of the view, which
can either be a
- * batch or streaming query plan.
- *
- * @param tableName
- * is either a qualified or unqualified name that designates a table or
view. If a database is
- * specified, it identifies the table/view from the database. Otherwise,
it first attempts to
- * find a temporary view with the given name and then match the table/view
from the current
- * database. Note that, the global temporary view database is also valid
here.
- * @since 3.4.0
- */
+ override def orc(paths: String*): DataFrame = super.orc(paths: _*)
+
+ /** @inheritdoc */
def table(tableName: String): DataFrame = {
+ assertNoSpecifiedSchema("table")
sparkSession.newDataFrame { builder =>
builder.getReadBuilder.getNamedTableBuilder
.setUnparsedIdentifier(tableName)
@@ -504,80 +185,19 @@ class DataFrameReader private[sql] (sparkSession:
SparkSession) extends Logging
}
}
- /**
- * Loads text files and returns a `DataFrame` whose schema starts with a
string column named
- * "value", and followed by partitioned columns if there are any. See the
documentation on the
- * other overloaded `text()` method for more details.
- *
- * @since 3.4.0
- */
- def text(path: String): DataFrame = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- text(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def text(path: String): DataFrame = super.text(path)
- /**
- * Loads text files and returns a `DataFrame` whose schema starts with a
string column named
- * "value", and followed by partitioned columns if there are any. The text
files must be encoded
- * as UTF-8.
- *
- * By default, each line in the text files is a new row in the resulting
DataFrame. For example:
- * {{{
- * // Scala:
- * spark.read.text("/path/to/spark/README.md")
- *
- * // Java:
- * spark.read().text("/path/to/spark/README.md")
- * }}}
- *
- * You can find the text-specific options for reading text files in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @param paths
- * input paths
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def text(paths: String*): DataFrame = format("text").load(paths: _*)
-
- /**
- * Loads text files and returns a [[Dataset]] of String. See the
documentation on the other
- * overloaded `textFile()` method for more details.
- * @since 3.4.0
- */
- def textFile(path: String): Dataset[String] = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- textFile(Seq(path): _*)
- }
+ override def text(paths: String*): DataFrame = super.text(paths: _*)
- /**
- * Loads text files and returns a [[Dataset]] of String. The underlying
schema of the Dataset
- * contains a single string column named "value". The text files must be
encoded as UTF-8.
- *
- * If the directory structure of the text files contains partitioning
information, those are
- * ignored in the resulting Dataset. To include partitioning information as
columns, use `text`.
- *
- * By default, each line in the text files is a new row in the resulting
DataFrame. For example:
- * {{{
- * // Scala:
- * spark.read.textFile("/path/to/spark/README.md")
- *
- * // Java:
- * spark.read().textFile("/path/to/spark/README.md")
- * }}}
- *
- * You can set the text-specific options as specified in
`DataFrameReader.text`.
- *
- * @param paths
- * input path
- * @since 3.4.0
- */
+ /** @inheritdoc */
+ override def textFile(path: String): Dataset[String] = super.textFile(path)
+
+ /** @inheritdoc */
@scala.annotation.varargs
- def textFile(paths: String*): Dataset[String] = {
- assertNoSpecifiedSchema("textFile")
- text(paths: _*).select("value").as(StringEncoder)
- }
+ override def textFile(paths: String*): Dataset[String] =
super.textFile(paths: _*)
private def assertSourceFormatSpecified(): Unit = {
if (source == null) {
@@ -597,24 +217,4 @@ class DataFrameReader private[sql] (sparkSession:
SparkSession) extends Logging
}
}
}
-
- /**
- * A convenient function for schema validation in APIs.
- */
- private def assertNoSpecifiedSchema(operation: String): Unit = {
- if (userSpecifiedSchema.nonEmpty) {
- throw DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation)
- }
- }
-
-
///////////////////////////////////////////////////////////////////////////////////////
- // Builder pattern config options
-
///////////////////////////////////////////////////////////////////////////////////////
-
- private var source: String = _
-
- private var userSpecifiedSchema: Option[StructType] = None
-
- private var extraOptions = CaseInsensitiveMap[String](Map.empty)
-
}
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 092a72ade1ea..93be185e0ffd 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -214,16 +214,7 @@ class SparkSession private[sql] (
sql(query, Array.empty)
}
- /**
- * Returns a [[DataFrameReader]] that can be used to read non-streaming data
in as a
- * `DataFrame`.
- * {{{
- * sparkSession.read.parquet("/path/to/file.parquet")
- * sparkSession.read.schema(schema).json("/path/to/file.json")
- * }}}
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def read: DataFrameReader = new DataFrameReader(this)
/**
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 10b31155376f..aefd830ef181 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -320,6 +320,14 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.UDFRegistration.initializeLogIfNecessary$default$2"),
+ // Protected DataFrameReader methods...
+ ProblemFilters.exclude[DirectMissingMethodProblem](
+ "org.apache.spark.sql.DataFrameReader.validateSingleVariantColumn"),
+ ProblemFilters.exclude[DirectMissingMethodProblem](
+ "org.apache.spark.sql.DataFrameReader.validateJsonSchema"),
+ ProblemFilters.exclude[DirectMissingMethodProblem](
+ "org.apache.spark.sql.DataFrameReader.validateXmlSchema"),
+
// Datasource V2 partition transforms
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.PartitionTransform"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.PartitionTransform$"),
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 03b7b6efca1b..7abf31f4aca3 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -125,6 +125,26 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Observation"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Observation$"),
+ // SPARK-49414: Remove Logging from DataFrameReader.
+
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.DataFrameReader"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logName"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.log"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logInfo"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logDebug"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logTrace"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logWarning"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logError"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logInfo"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logDebug"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logTrace"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logWarning"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logError"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.isTraceEnabled"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.initializeLogIfNecessary"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.initializeLogIfNecessary"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.initializeLogIfNecessary$default$2"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.initializeForcefully"),
+
// SPARK-49425: Create a shared DataFrameWriter interface.
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameWriter"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala
similarity index 57%
copy from sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
copy to sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala
index 9d7a765a24c9..b9910e846826 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala
@@ -14,38 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.spark.sql
-
-import java.util.{Locale, Properties}
+package org.apache.spark.sql.api
import scala.jdk.CollectionConverters._
-import org.apache.spark.Partition
+import _root_.java.util
+
import org.apache.spark.annotation.Stable
-import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions,
UnivocityParser}
-import org.apache.spark.sql.catalyst.expressions.ExprUtils
-import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser,
JSONOptions}
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CharVarcharUtils, FailureSafeParser}
-import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, XmlOptions}
-import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.execution.datasources.csv._
-import org.apache.spark.sql.execution.datasources.jdbc._
-import
org.apache.spark.sql.execution.datasources.json.JsonUtils.checkJsonSchema
-import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
-import org.apache.spark.sql.execution.datasources.xml.TextInputXmlDataSource
-import org.apache.spark.sql.execution.datasources.xml.XmlUtils.checkXmlSchema
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
SparkCharVarcharUtils}
+import org.apache.spark.sql.errors.DataTypeErrors
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.unsafe.types.UTF8String
/**
* Interface used to load a [[Dataset]] from external storage systems (e.g.
file systems,
@@ -54,14 +34,13 @@ import org.apache.spark.unsafe.types.UTF8String
* @since 1.4.0
*/
@Stable
-class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging
{
-
- /**
+abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] {
+/**
* Specifies the input data source format.
*
* @since 1.4.0
*/
- def format(source: String): DataFrameReader = {
+ def format(source: String): this.type = {
this.source = source
this
}
@@ -73,9 +52,9 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 1.4.0
*/
- def schema(schema: StructType): DataFrameReader = {
+ def schema(schema: StructType): this.type = {
if (schema != null) {
- val replaced =
CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
+ val replaced =
SparkCharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
this.userSpecifiedSchema = Option(replaced)
validateSingleVariantColumn()
}
@@ -93,9 +72,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 2.3.0
*/
- def schema(schemaString: String): DataFrameReader = {
- schema(StructType.fromDDL(schemaString))
- }
+ def schema(schemaString: String): this.type =
schema(StructType.fromDDL(schemaString))
/**
* Adds an input option for the underlying data source.
@@ -105,7 +82,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 1.4.0
*/
- def option(key: String, value: String): DataFrameReader = {
+ def option(key: String, value: String): this.type = {
this.extraOptions = this.extraOptions + (key -> value)
validateSingleVariantColumn()
this
@@ -119,7 +96,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 2.0.0
*/
- def option(key: String, value: Boolean): DataFrameReader = option(key,
value.toString)
+ def option(key: String, value: Boolean): this.type = option(key,
value.toString)
/**
* Adds an input option for the underlying data source.
@@ -129,7 +106,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 2.0.0
*/
- def option(key: String, value: Long): DataFrameReader = option(key,
value.toString)
+ def option(key: String, value: Long): this.type = option(key, value.toString)
/**
* Adds an input option for the underlying data source.
@@ -139,7 +116,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 2.0.0
*/
- def option(key: String, value: Double): DataFrameReader = option(key,
value.toString)
+ def option(key: String, value: Double): this.type = option(key,
value.toString)
/**
* (Scala-specific) Adds input options for the underlying data source.
@@ -149,7 +126,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 1.4.0
*/
- def options(options: scala.collection.Map[String, String]): DataFrameReader
= {
+ def options(options: scala.collection.Map[String, String]): this.type = {
this.extraOptions ++= options
validateSingleVariantColumn()
this
@@ -163,10 +140,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 1.4.0
*/
- def options(options: java.util.Map[String, String]): DataFrameReader = {
- this.options(options.asScala)
- this
- }
+ def options(opts: util.Map[String, String]): this.type =
options(opts.asScala)
/**
* Loads input in as a `DataFrame`, for data sources that don't require a
path (e.g. external
@@ -174,9 +148,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 1.4.0
*/
- def load(): DataFrame = {
- load(Seq.empty: _*) // force invocation of `load(...varargs...)`
- }
+ def load(): DS[Row]
/**
* Loads input in as a `DataFrame`, for data sources that require a path
(e.g. data backed by
@@ -184,14 +156,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 1.4.0
*/
- def load(path: String): DataFrame = {
- // force invocation of `load(...varargs...)`
- if (sparkSession.sessionState.conf.legacyPathOptionBehavior) {
- option("path", path).load(Seq.empty: _*)
- } else {
- load(Seq(path): _*)
- }
- }
+ def load(path: String): DS[Row]
/**
* Loads input in as a `DataFrame`, for data sources that support multiple
paths.
@@ -200,40 +165,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* @since 1.6.0
*/
@scala.annotation.varargs
- def load(paths: String*): DataFrame = {
- if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
- throw
QueryCompilationErrors.cannotOperateOnHiveDataSourceFilesError("read")
- }
-
- val legacyPathOptionBehavior =
sparkSession.sessionState.conf.legacyPathOptionBehavior
- if (!legacyPathOptionBehavior &&
- (extraOptions.contains("path") || extraOptions.contains("paths")) &&
paths.nonEmpty) {
- throw QueryCompilationErrors.pathOptionNotSetCorrectlyWhenReadingError()
- }
-
- DataSource.lookupDataSourceV2(source,
sparkSession.sessionState.conf).flatMap { provider =>
- DataSourceV2Utils.loadV2Source(sparkSession, provider,
userSpecifiedSchema, extraOptions,
- source, paths: _*)
- }.getOrElse(loadV1Source(paths: _*))
- }
-
- private def loadV1Source(paths: String*) = {
- val legacyPathOptionBehavior =
sparkSession.sessionState.conf.legacyPathOptionBehavior
- val (finalPaths, finalOptions) = if (!legacyPathOptionBehavior &&
paths.length == 1) {
- (Nil, extraOptions + ("path" -> paths.head))
- } else {
- (paths, extraOptions)
- }
-
- // Code path for data source v1.
- sparkSession.baseRelationToDataFrame(
- DataSource.apply(
- sparkSession,
- paths = finalPaths,
- userSpecifiedSchema = userSpecifiedSchema,
- className = source,
- options = finalOptions.originalMap).resolveRelation())
- }
+ def load(paths: String*): DS[Row]
/**
* Construct a `DataFrame` representing the database table accessible via
JDBC URL
@@ -246,12 +178,12 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 1.4.0
*/
- def jdbc(url: String, table: String, properties: Properties): DataFrame = {
+ def jdbc(url: String, table: String, properties: util.Properties): DS[Row] =
{
assertNoSpecifiedSchema("jdbc")
// properties should override settings in extraOptions.
this.extraOptions ++= properties.asScala
// explicit url and dbtable should override all
- this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url,
JDBCOptions.JDBC_TABLE_NAME -> table)
+ this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
format("jdbc").load()
}
@@ -287,13 +219,13 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
- connectionProperties: Properties): DataFrame = {
+ connectionProperties: util.Properties): DS[Row] = {
// columnName, lowerBound, upperBound and numPartitions override settings
in extraOptions.
this.extraOptions ++= Map(
- JDBCOptions.JDBC_PARTITION_COLUMN -> columnName,
- JDBCOptions.JDBC_LOWER_BOUND -> lowerBound.toString,
- JDBCOptions.JDBC_UPPER_BOUND -> upperBound.toString,
- JDBCOptions.JDBC_NUM_PARTITIONS -> numPartitions.toString)
+ "partitionColumn" -> columnName,
+ "lowerBound" -> lowerBound.toString,
+ "upperBound" -> upperBound.toString,
+ "numPartitions" -> numPartitions.toString)
jdbc(url, table, connectionProperties)
}
@@ -323,17 +255,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
url: String,
table: String,
predicates: Array[String],
- connectionProperties: Properties): DataFrame = {
- assertNoSpecifiedSchema("jdbc")
- // connectionProperties should override settings in extraOptions.
- val params = extraOptions ++ connectionProperties.asScala
- val options = new JDBCOptions(url, table, params)
- val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i)
=>
- JDBCPartition(part, i) : Partition
- }
- val relation = JDBCRelation(parts, options)(sparkSession)
- sparkSession.baseRelationToDataFrame(relation)
- }
+ connectionProperties: util.Properties): DS[Row]
/**
* Loads a JSON file and returns the results as a `DataFrame`.
@@ -342,7 +264,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 1.4.0
*/
- def json(path: String): DataFrame = {
+ def json(path: String): DS[Row] = {
// This method ensures that calls that explicit need single argument
works, see SPARK-16009
json(Seq(path): _*)
}
@@ -358,43 +280,14 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* You can find the JSON-specific options for reading JSON files in
* <a
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option">
- * Data Source Option</a> in the version you use.
+ * Data Source Option</a> in the version you use.
*
* @since 2.0.0
*/
@scala.annotation.varargs
- def json(paths: String*): DataFrame = {
- userSpecifiedSchema.foreach(checkJsonSchema)
- format("json").load(paths : _*)
- }
-
- /**
- * Loads a `JavaRDD[String]` storing JSON objects (<a
href="http://jsonlines.org/">JSON
- * Lines text format or newline-delimited JSON</a>) and returns the result as
- * a `DataFrame`.
- *
- * Unless the schema is specified using `schema` function, this function
goes through the
- * input once to determine the input schema.
- *
- * @param jsonRDD input RDD with one JSON object per record
- * @since 1.4.0
- */
- @deprecated("Use json(Dataset[String]) instead.", "2.2.0")
- def json(jsonRDD: JavaRDD[String]): DataFrame = json(jsonRDD.rdd)
-
- /**
- * Loads an `RDD[String]` storing JSON objects (<a
href="http://jsonlines.org/">JSON Lines
- * text format or newline-delimited JSON</a>) and returns the result as a
`DataFrame`.
- *
- * Unless the schema is specified using `schema` function, this function
goes through the
- * input once to determine the input schema.
- *
- * @param jsonRDD input RDD with one JSON object per record
- * @since 1.4.0
- */
- @deprecated("Use json(Dataset[String]) instead.", "2.2.0")
- def json(jsonRDD: RDD[String]): DataFrame = {
- json(sparkSession.createDataset(jsonRDD)(Encoders.STRING))
+ def json(paths: String*): DS[Row] = {
+ validateJsonSchema()
+ format("json").load(paths: _*)
}
/**
@@ -407,37 +300,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* @param jsonDataset input Dataset with one JSON object per record
* @since 2.2.0
*/
- def json(jsonDataset: Dataset[String]): DataFrame = {
- val parsedOptions = new JSONOptions(
- extraOptions.toMap,
- sparkSession.sessionState.conf.sessionLocalTimeZone,
- sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-
- userSpecifiedSchema.foreach(checkJsonSchema)
- val schema = userSpecifiedSchema.map {
- case s if !SQLConf.get.getConf(
- SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) =>
s.asNullable
- case other => other
- }.getOrElse {
- TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
- }
-
- ExprUtils.verifyColumnNameOfCorruptRecord(schema,
parsedOptions.columnNameOfCorruptRecord)
- val actualSchema =
- StructType(schema.filterNot(_.name ==
parsedOptions.columnNameOfCorruptRecord))
-
- val createParser = CreateJacksonParser.string _
- val parsed = jsonDataset.rdd.mapPartitions { iter =>
- val rawParser = new JacksonParser(actualSchema, parsedOptions,
allowArrayAsStructs = true)
- val parser = new FailureSafeParser[String](
- input => rawParser.parse(input, createParser, UTF8String.fromString),
- parsedOptions.parseMode,
- schema,
- parsedOptions.columnNameOfCorruptRecord)
- iter.flatMap(parser.parse)
- }
- sparkSession.internalCreateDataFrame(parsed, schema, isStreaming =
jsonDataset.isStreaming)
- }
+ def json(jsonDataset: DS[String] ): DS[Row]
/**
* Loads a CSV file and returns the result as a `DataFrame`. See the
documentation on the
@@ -445,7 +308,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 2.0.0
*/
- def csv(path: String): DataFrame = {
+ def csv(path: String): DS[Row] = {
// This method ensures that calls that explicit need single argument
works, see SPARK-16009
csv(Seq(path): _*)
}
@@ -469,63 +332,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* @param csvDataset input Dataset with one CSV row per record
* @since 2.2.0
*/
- def csv(csvDataset: Dataset[String]): DataFrame = {
- val parsedOptions: CSVOptions = new CSVOptions(
- extraOptions.toMap,
- sparkSession.sessionState.conf.csvColumnPruning,
- sparkSession.sessionState.conf.sessionLocalTimeZone)
- val filteredLines: Dataset[String] =
- CSVUtils.filterCommentAndEmpty(csvDataset, parsedOptions)
-
- // For performance, short-circuit the collection of the first line when it
won't be used:
- // - TextInputCSVDataSource - Only uses firstLine to infer an
unspecified schema
- // - CSVHeaderChecker - Only uses firstLine to check header, when
headerFlag is true
- // - CSVUtils - Only uses firstLine to filter headers,
when headerFlag is true
- // (If the downstream logic grows more complicated, consider refactoring
to an approach that
- // delegates this decision to the constituent consumers themselves.)
- val maybeFirstLine: Option[String] =
- if (userSpecifiedSchema.isEmpty || parsedOptions.headerFlag) {
- filteredLines.take(1).headOption
- } else {
- None
- }
-
- val schema = userSpecifiedSchema.map {
- case s if !SQLConf.get.getConf(
- SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) =>
s.asNullable
- case other => other
- }.getOrElse {
- TextInputCSVDataSource.inferFromDataset(
- sparkSession,
- csvDataset,
- maybeFirstLine,
- parsedOptions)
- }
-
- ExprUtils.verifyColumnNameOfCorruptRecord(schema,
parsedOptions.columnNameOfCorruptRecord)
- val actualSchema =
- StructType(schema.filterNot(_.name ==
parsedOptions.columnNameOfCorruptRecord))
-
- val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
- val headerChecker = new CSVHeaderChecker(
- actualSchema,
- parsedOptions,
- source = s"CSV source: $csvDataset")
- headerChecker.checkHeaderColumnNames(firstLine)
- filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine,
parsedOptions))
- }.getOrElse(filteredLines.rdd)
-
- val parsed = linesWithoutHeader.mapPartitions { iter =>
- val rawParser = new UnivocityParser(actualSchema, parsedOptions)
- val parser = new FailureSafeParser[String](
- input => rawParser.parse(input),
- parsedOptions.parseMode,
- schema,
- parsedOptions.columnNameOfCorruptRecord)
- iter.flatMap(parser.parse)
- }
- sparkSession.internalCreateDataFrame(parsed, schema, isStreaming =
csvDataset.isStreaming)
- }
+ def csv(csvDataset: DS[String]): DS[Row]
/**
* Loads CSV files and returns the result as a `DataFrame`.
@@ -541,7 +348,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* @since 2.0.0
*/
@scala.annotation.varargs
- def csv(paths: String*): DataFrame = format("csv").load(paths : _*)
+ def csv(paths: String*): DS[Row] = format("csv").load(paths : _*)
/**
* Loads a XML file and returns the result as a `DataFrame`. See the
documentation on the
@@ -549,7 +356,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 4.0.0
*/
- def xml(path: String): DataFrame = {
+ def xml(path: String): DS[Row] = {
// This method ensures that calls that explicit need single argument
works, see SPARK-16009
xml(Seq(path): _*)
}
@@ -568,8 +375,8 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* @since 4.0.0
*/
@scala.annotation.varargs
- def xml(paths: String*): DataFrame = {
- userSpecifiedSchema.foreach(checkXmlSchema)
+ def xml(paths: String*): DS[Row] = {
+ validateXmlSchema()
format("xml").load(paths: _*)
}
@@ -582,37 +389,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* @param xmlDataset input Dataset with one XML object per record
* @since 4.0.0
*/
- def xml(xmlDataset: Dataset[String]): DataFrame = {
- val parsedOptions: XmlOptions = new XmlOptions(
- extraOptions.toMap,
- sparkSession.sessionState.conf.sessionLocalTimeZone,
- sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-
- userSpecifiedSchema.foreach(checkXmlSchema)
-
- val schema = userSpecifiedSchema.map {
- case s if !SQLConf.get.getConf(
- SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) =>
s.asNullable
- case other => other
- }.getOrElse {
- TextInputXmlDataSource.inferFromDataset(xmlDataset, parsedOptions)
- }
-
- ExprUtils.verifyColumnNameOfCorruptRecord(schema,
parsedOptions.columnNameOfCorruptRecord)
- val actualSchema =
- StructType(schema.filterNot(_.name ==
parsedOptions.columnNameOfCorruptRecord))
-
- val parsed = xmlDataset.rdd.mapPartitions { iter =>
- val rawParser = new StaxXmlParser(actualSchema, parsedOptions)
- val parser = new FailureSafeParser[String](
- input => rawParser.parse(input),
- parsedOptions.parseMode,
- schema,
- parsedOptions.columnNameOfCorruptRecord)
- iter.flatMap(parser.parse)
- }
- sparkSession.internalCreateDataFrame(parsed, schema, isStreaming =
xmlDataset.isStreaming)
- }
+ def xml(xmlDataset: DS[String]): DS[Row]
/**
* Loads a Parquet file, returning the result as a `DataFrame`. See the
documentation
@@ -620,7 +397,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 2.0.0
*/
- def parquet(path: String): DataFrame = {
+ def parquet(path: String): DS[Row] = {
// This method ensures that calls that explicit need single argument
works, see SPARK-16009
parquet(Seq(path): _*)
}
@@ -636,9 +413,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* @since 1.4.0
*/
@scala.annotation.varargs
- def parquet(paths: String*): DataFrame = {
- format("parquet").load(paths: _*)
- }
+ def parquet(paths: String*): DS[Row] = format("parquet").load(paths: _*)
/**
* Loads an ORC file and returns the result as a `DataFrame`.
@@ -646,7 +421,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* @param path input path
* @since 1.5.0
*/
- def orc(path: String): DataFrame = {
+ def orc(path: String): DS[Row] = {
// This method ensures that calls that explicit need single argument
works, see SPARK-16009
orc(Seq(path): _*)
}
@@ -663,7 +438,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* @since 2.0.0
*/
@scala.annotation.varargs
- def orc(paths: String*): DataFrame = format("orc").load(paths: _*)
+ def orc(paths: String*): DS[Row] = format("orc").load(paths: _*)
/**
* Returns the specified table/view as a `DataFrame`. If it's a table, it
must support batch
@@ -678,13 +453,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* Note that, the global temporary view database is also
valid here.
* @since 1.4.0
*/
- def table(tableName: String): DataFrame = {
- assertNoSpecifiedSchema("table")
- val multipartIdentifier =
- sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
- Dataset.ofRows(sparkSession, UnresolvedRelation(multipartIdentifier,
- new CaseInsensitiveStringMap(extraOptions.toMap.asJava)))
- }
+ def table(tableName: String): DS[Row]
/**
* Loads text files and returns a `DataFrame` whose schema starts with a
string column named
@@ -693,7 +462,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
*
* @since 2.0.0
*/
- def text(path: String): DataFrame = {
+ def text(path: String): DS[Row] = {
// This method ensures that calls that explicit need single argument
works, see SPARK-16009
text(Seq(path): _*)
}
@@ -720,14 +489,14 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* @since 1.6.0
*/
@scala.annotation.varargs
- def text(paths: String*): DataFrame = format("text").load(paths : _*)
+ def text(paths: String*): DS[Row] = format("text").load(paths : _*)
/**
* Loads text files and returns a [[Dataset]] of String. See the
documentation on the
* other overloaded `textFile()` method for more details.
* @since 2.0.0
*/
- def textFile(path: String): Dataset[String] = {
+ def textFile(path: String): DS[String] = {
// This method ensures that calls that explicit need single argument
works, see SPARK-16009
textFile(Seq(path): _*)
}
@@ -755,17 +524,17 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* @since 2.0.0
*/
@scala.annotation.varargs
- def textFile(paths: String*): Dataset[String] = {
+ def textFile(paths: String*): DS[String] = {
assertNoSpecifiedSchema("textFile")
- text(paths :
_*).select("value").as[String](sparkSession.implicits.newStringEncoder)
+ text(paths : _*).select("value").as(StringEncoder)
}
/**
* A convenient function for schema validation in APIs.
*/
- private def assertNoSpecifiedSchema(operation: String): Unit = {
+ protected def assertNoSpecifiedSchema(operation: String): Unit = {
if (userSpecifiedSchema.nonEmpty) {
- throw
QueryCompilationErrors.userSpecifiedSchemaUnsupportedError(operation)
+ throw DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation)
}
}
@@ -773,21 +542,19 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
* Ensure that the `singleVariantColumn` option cannot be used if there is
also a user specified
* schema.
*/
- private def validateSingleVariantColumn(): Unit = {
- if (extraOptions.get(JSONOptions.SINGLE_VARIANT_COLUMN).isDefined &&
- userSpecifiedSchema.isDefined) {
- throw QueryCompilationErrors.invalidSingleVariantColumn()
- }
- }
+ protected def validateSingleVariantColumn(): Unit = ()
+
+ protected def validateJsonSchema(): Unit = ()
+
+ protected def validateXmlSchema(): Unit = ()
///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
- private var source: String =
sparkSession.sessionState.conf.defaultDataSourceName
-
- private var userSpecifiedSchema: Option[StructType] = None
+ protected var source: String = _
- private var extraOptions = CaseInsensitiveMap[String](Map.empty)
+ protected var userSpecifiedSchema: Option[StructType] = None
+ protected var extraOptions: CaseInsensitiveMap[String] =
CaseInsensitiveMap[String](Map.empty)
}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
index 0c173d9da498..633a54e32bc8 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -385,6 +385,18 @@ abstract class SparkSession[DS[U] <: Dataset[U, DS]]
extends Serializable with C
@scala.annotation.varargs
def addArtifacts(uri: URI*): Unit
+ /**
+ * Returns a [[DataFrameReader]] that can be used to read non-streaming data
in as a
+ * `DataFrame`.
+ * {{{
+ * sparkSession.read.parquet("/path/to/file.parquet")
+ * sparkSession.read.schema(schema).json("/path/to/file.json")
+ * }}}
+ *
+ * @since 2.0.0
+ */
+ def read: DataFrameReader[DS]
+
/**
* Executes some code block and prints to stdout the time taken to execute
the block. This is
* available in Scala only and is used primarily for interactive testing and
debugging.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 9d7a765a24c9..f105a77cf253 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -24,13 +24,12 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.Partition
import org.apache.spark.annotation.Stable
import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions,
UnivocityParser}
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser,
JSONOptions}
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CharVarcharUtils, FailureSafeParser}
+import org.apache.spark.sql.catalyst.util.FailureSafeParser
import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, XmlOptions}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.DDLUtils
@@ -54,136 +53,42 @@ import org.apache.spark.unsafe.types.UTF8String
* @since 1.4.0
*/
@Stable
-class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging
{
+class DataFrameReader private[sql](sparkSession: SparkSession)
+ extends api.DataFrameReader[Dataset] {
+ format(sparkSession.sessionState.conf.defaultDataSourceName)
- /**
- * Specifies the input data source format.
- *
- * @since 1.4.0
- */
- def format(source: String): DataFrameReader = {
- this.source = source
- this
- }
+ /** @inheritdoc */
+ override def format(source: String): this.type = super.format(source)
- /**
- * Specifies the input schema. Some data sources (e.g. JSON) can infer the
input schema
- * automatically from data. By specifying the schema here, the underlying
data source can
- * skip the schema inference step, and thus speed up data loading.
- *
- * @since 1.4.0
- */
- def schema(schema: StructType): DataFrameReader = {
- if (schema != null) {
- val replaced =
CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
- this.userSpecifiedSchema = Option(replaced)
- validateSingleVariantColumn()
- }
- this
- }
+ /** @inheritdoc */
+ override def schema(schema: StructType): this.type = super.schema(schema)
- /**
- * Specifies the schema by using the input DDL-formatted string. Some data
sources (e.g. JSON) can
- * infer the input schema automatically from data. By specifying the schema
here, the underlying
- * data source can skip the schema inference step, and thus speed up data
loading.
- *
- * {{{
- * spark.read.schema("a INT, b STRING, c DOUBLE").csv("test.csv")
- * }}}
- *
- * @since 2.3.0
- */
- def schema(schemaString: String): DataFrameReader = {
- schema(StructType.fromDDL(schemaString))
- }
+ /** @inheritdoc */
+ override def schema(schemaString: String): this.type =
super.schema(schemaString)
- /**
- * Adds an input option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names.
- * If a new option has the same key case-insensitively, it will override the
existing option.
- *
- * @since 1.4.0
- */
- def option(key: String, value: String): DataFrameReader = {
- this.extraOptions = this.extraOptions + (key -> value)
- validateSingleVariantColumn()
- this
- }
+ /** @inheritdoc */
+ override def option(key: String, value: String): this.type =
super.option(key, value)
- /**
- * Adds an input option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names.
- * If a new option has the same key case-insensitively, it will override the
existing option.
- *
- * @since 2.0.0
- */
- def option(key: String, value: Boolean): DataFrameReader = option(key,
value.toString)
+ /** @inheritdoc */
+ override def option(key: String, value: Boolean): this.type =
super.option(key, value)
- /**
- * Adds an input option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names.
- * If a new option has the same key case-insensitively, it will override the
existing option.
- *
- * @since 2.0.0
- */
- def option(key: String, value: Long): DataFrameReader = option(key,
value.toString)
+ /** @inheritdoc */
+ override def option(key: String, value: Long): this.type = super.option(key,
value)
- /**
- * Adds an input option for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names.
- * If a new option has the same key case-insensitively, it will override the
existing option.
- *
- * @since 2.0.0
- */
- def option(key: String, value: Double): DataFrameReader = option(key,
value.toString)
+ /** @inheritdoc */
+ override def option(key: String, value: Double): this.type =
super.option(key, value)
- /**
- * (Scala-specific) Adds input options for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names.
- * If a new option has the same key case-insensitively, it will override the
existing option.
- *
- * @since 1.4.0
- */
- def options(options: scala.collection.Map[String, String]): DataFrameReader
= {
- this.extraOptions ++= options
- validateSingleVariantColumn()
- this
- }
+ /** @inheritdoc */
+ override def options(options: scala.collection.Map[String, String]):
this.type =
+ super.options(options)
- /**
- * Adds input options for the underlying data source.
- *
- * All options are maintained in a case-insensitive way in terms of key
names.
- * If a new option has the same key case-insensitively, it will override the
existing option.
- *
- * @since 1.4.0
- */
- def options(options: java.util.Map[String, String]): DataFrameReader = {
- this.options(options.asScala)
- this
- }
+ /** @inheritdoc */
+ override def options(options: java.util.Map[String, String]): this.type =
super.options(options)
- /**
- * Loads input in as a `DataFrame`, for data sources that don't require a
path (e.g. external
- * key-value stores).
- *
- * @since 1.4.0
- */
- def load(): DataFrame = {
- load(Seq.empty: _*) // force invocation of `load(...varargs...)`
- }
+ /** @inheritdoc */
+ override def load(): DataFrame = load(Nil: _*)
- /**
- * Loads input in as a `DataFrame`, for data sources that require a path
(e.g. data backed by
- * a local or distributed file system).
- *
- * @since 1.4.0
- */
+ /** @inheritdoc */
def load(path: String): DataFrame = {
// force invocation of `load(...varargs...)`
if (sparkSession.sessionState.conf.legacyPathOptionBehavior) {
@@ -193,12 +98,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
}
}
- /**
- * Loads input in as a `DataFrame`, for data sources that support multiple
paths.
- * Only works if the source is a HadoopFsRelationProvider.
- *
- * @since 1.6.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
def load(paths: String*): DataFrame = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
@@ -235,90 +135,22 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
options = finalOptions.originalMap).resolveRelation())
}
- /**
- * Construct a `DataFrame` representing the database table accessible via
JDBC URL
- * url named table and connection properties.
- *
- * You can find the JDBC-specific option and parameter documentation for
reading tables
- * via JDBC in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 1.4.0
- */
- def jdbc(url: String, table: String, properties: Properties): DataFrame = {
- assertNoSpecifiedSchema("jdbc")
- // properties should override settings in extraOptions.
- this.extraOptions ++= properties.asScala
- // explicit url and dbtable should override all
- this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url,
JDBCOptions.JDBC_TABLE_NAME -> table)
- format("jdbc").load()
- }
+ /** @inheritdoc */
+ override def jdbc(url: String, table: String, properties: Properties):
DataFrame =
+ super.jdbc(url, table, properties)
- // scalastyle:off line.size.limit
- /**
- * Construct a `DataFrame` representing the database table accessible via
JDBC URL
- * url named table. Partitions of the table will be retrieved in parallel
based on the parameters
- * passed to this function.
- *
- * Don't create too many partitions in parallel on a large cluster;
otherwise Spark might crash
- * your external database systems.
- *
- * You can find the JDBC-specific option and parameter documentation for
reading tables via JDBC in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @param table Name of the table in the external database.
- * @param columnName Alias of `partitionColumn` option. Refer to
`partitionColumn` in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
- * Data Source Option</a> in the version you use.
- * @param connectionProperties JDBC database connection arguments, a list of
arbitrary string
- * tag/value. Normally at least a "user" and
"password" property
- * should be included. "fetchsize" can be used
to control the
- * number of rows per fetch and "queryTimeout"
can be used to wait
- * for a Statement object to execute to the
given number of seconds.
- * @since 1.4.0
- */
- // scalastyle:on line.size.limit
- def jdbc(
+ /** @inheritdoc */
+ override def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
- connectionProperties: Properties): DataFrame = {
- // columnName, lowerBound, upperBound and numPartitions override settings
in extraOptions.
- this.extraOptions ++= Map(
- JDBCOptions.JDBC_PARTITION_COLUMN -> columnName,
- JDBCOptions.JDBC_LOWER_BOUND -> lowerBound.toString,
- JDBCOptions.JDBC_UPPER_BOUND -> upperBound.toString,
- JDBCOptions.JDBC_NUM_PARTITIONS -> numPartitions.toString)
- jdbc(url, table, connectionProperties)
- }
+ connectionProperties: Properties): DataFrame =
+ super.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions,
connectionProperties)
- /**
- * Construct a `DataFrame` representing the database table accessible via
JDBC URL
- * url named table using connection properties. The `predicates` parameter
gives a list
- * expressions suitable for inclusion in WHERE clauses; each one defines one
partition
- * of the `DataFrame`.
- *
- * Don't create too many partitions in parallel on a large cluster;
otherwise Spark might crash
- * your external database systems.
- *
- * You can find the JDBC-specific option and parameter documentation for
reading tables
- * via JDBC in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @param table Name of the table in the external database.
- * @param predicates Condition in the where clause for each partition.
- * @param connectionProperties JDBC database connection arguments, a list of
arbitrary string
- * tag/value. Normally at least a "user" and
"password" property
- * should be included. "fetchsize" can be used
to control the
- * number of rows per fetch.
- * @since 1.4.0
- */
+ /** @inheritdoc */
def jdbc(
url: String,
table: String,
@@ -335,38 +167,12 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
sparkSession.baseRelationToDataFrame(relation)
}
- /**
- * Loads a JSON file and returns the results as a `DataFrame`.
- *
- * See the documentation on the overloaded `json()` method with varargs for
more details.
- *
- * @since 1.4.0
- */
- def json(path: String): DataFrame = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- json(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def json(path: String): DataFrame = super.json(path)
- /**
- * Loads JSON files and returns the results as a `DataFrame`.
- *
- * <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON)
is supported by
- * default. For JSON (one record per file), set the `multiLine` option to
true.
- *
- * This function goes through the input once to determine the input schema.
If you know the
- * schema in advance, use the version that specifies the schema to avoid the
extra scan.
- *
- * You can find the JSON-specific options for reading JSON files in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def json(paths: String*): DataFrame = {
- userSpecifiedSchema.foreach(checkJsonSchema)
- format("json").load(paths : _*)
- }
+ override def json(paths: String*): DataFrame = super.json(paths: _*)
/**
* Loads a `JavaRDD[String]` storing JSON objects (<a
href="http://jsonlines.org/">JSON
@@ -397,16 +203,7 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
json(sparkSession.createDataset(jsonRDD)(Encoders.STRING))
}
- /**
- * Loads a `Dataset[String]` storing JSON objects (<a
href="http://jsonlines.org/">JSON Lines
- * text format or newline-delimited JSON</a>) and returns the result as a
`DataFrame`.
- *
- * Unless the schema is specified using `schema` function, this function
goes through the
- * input once to determine the input schema.
- *
- * @param jsonDataset input Dataset with one JSON object per record
- * @since 2.2.0
- */
+ /** @inheritdoc */
def json(jsonDataset: Dataset[String]): DataFrame = {
val parsedOptions = new JSONOptions(
extraOptions.toMap,
@@ -439,36 +236,10 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming =
jsonDataset.isStreaming)
}
- /**
- * Loads a CSV file and returns the result as a `DataFrame`. See the
documentation on the
- * other overloaded `csv()` method for more details.
- *
- * @since 2.0.0
- */
- def csv(path: String): DataFrame = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- csv(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def csv(path: String): DataFrame = super.csv(path)
- /**
- * Loads an `Dataset[String]` storing CSV rows and returns the result as a
`DataFrame`.
- *
- * If the schema is not specified using `schema` function and `inferSchema`
option is enabled,
- * this function goes through the input once to determine the input schema.
- *
- * If the schema is not specified using `schema` function and `inferSchema`
option is disabled,
- * it determines the columns as string types and it reads only the first
line to determine the
- * names and the number of fields.
- *
- * If the enforceSchema is set to `false`, only the CSV header in the first
line is checked
- * to conform specified or inferred schema.
- *
- * @note if `header` option is set to `true` when calling this API, all
lines same with
- * the header will be removed if exists.
- *
- * @param csvDataset input Dataset with one CSV row per record
- * @since 2.2.0
- */
+ /** @inheritdoc */
def csv(csvDataset: Dataset[String]): DataFrame = {
val parsedOptions: CSVOptions = new CSVOptions(
extraOptions.toMap,
@@ -527,61 +298,18 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming =
csvDataset.isStreaming)
}
- /**
- * Loads CSV files and returns the result as a `DataFrame`.
- *
- * This function will go through the input once to determine the input
schema if `inferSchema`
- * is enabled. To avoid going through the entire data once, disable
`inferSchema` option or
- * specify the schema explicitly using `schema`.
- *
- * You can find the CSV-specific options for reading CSV files in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def csv(paths: String*): DataFrame = format("csv").load(paths : _*)
+ override def csv(paths: String*): DataFrame = super.csv(paths: _*)
- /**
- * Loads a XML file and returns the result as a `DataFrame`. See the
documentation on the
- * other overloaded `xml()` method for more details.
- *
- * @since 4.0.0
- */
- def xml(path: String): DataFrame = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- xml(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def xml(path: String): DataFrame = super.xml(path)
- /**
- * Loads XML files and returns the result as a `DataFrame`.
- *
- * This function will go through the input once to determine the input
schema if `inferSchema`
- * is enabled. To avoid going through the entire data once, disable
`inferSchema` option or
- * specify the schema explicitly using `schema`.
- *
- * You can find the XML-specific options for reading XML files in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 4.0.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def xml(paths: String*): DataFrame = {
- userSpecifiedSchema.foreach(checkXmlSchema)
- format("xml").load(paths: _*)
- }
+ override def xml(paths: String*): DataFrame = super.xml(paths: _*)
- /**
- * Loads an `Dataset[String]` storing XML object and returns the result as a
`DataFrame`.
- *
- * If the schema is not specified using `schema` function and `inferSchema`
option is enabled,
- * this function goes through the input once to determine the input schema.
- *
- * @param xmlDataset input Dataset with one XML object per record
- * @since 4.0.0
- */
+ /** @inheritdoc */
def xml(xmlDataset: Dataset[String]): DataFrame = {
val parsedOptions: XmlOptions = new XmlOptions(
extraOptions.toMap,
@@ -614,70 +342,21 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming =
xmlDataset.isStreaming)
}
- /**
- * Loads a Parquet file, returning the result as a `DataFrame`. See the
documentation
- * on the other overloaded `parquet()` method for more details.
- *
- * @since 2.0.0
- */
- def parquet(path: String): DataFrame = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- parquet(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def parquet(path: String): DataFrame = super.parquet(path)
- /**
- * Loads a Parquet file, returning the result as a `DataFrame`.
- *
- * Parquet-specific option(s) for reading Parquet files can be found in
- * <a href=
- *
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 1.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def parquet(paths: String*): DataFrame = {
- format("parquet").load(paths: _*)
- }
+ override def parquet(paths: String*): DataFrame = super.parquet(paths: _*)
- /**
- * Loads an ORC file and returns the result as a `DataFrame`.
- *
- * @param path input path
- * @since 1.5.0
- */
- def orc(path: String): DataFrame = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- orc(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def orc(path: String): DataFrame = super.orc(path)
- /**
- * Loads ORC files and returns the result as a `DataFrame`.
- *
- * ORC-specific option(s) for reading ORC files can be found in
- * <a href=
- *
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @param paths input paths
- * @since 2.0.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def orc(paths: String*): DataFrame = format("orc").load(paths: _*)
+ override def orc(paths: String*): DataFrame = super.orc(paths: _*)
- /**
- * Returns the specified table/view as a `DataFrame`. If it's a table, it
must support batch
- * reading and the returned DataFrame is the batch scan query plan of this
table. If it's a view,
- * the returned DataFrame is simply the query plan of the view, which can
either be a batch or
- * streaming query plan.
- *
- * @param tableName is either a qualified or unqualified name that
designates a table or view.
- * If a database is specified, it identifies the table/view
from the database.
- * Otherwise, it first attempts to find a temporary view
with the given name
- * and then match the table/view from the current database.
- * Note that, the global temporary view database is also
valid here.
- * @since 1.4.0
- */
+ /** @inheritdoc */
def table(tableName: String): DataFrame = {
assertNoSpecifiedSchema("table")
val multipartIdentifier =
@@ -686,108 +365,31 @@ class DataFrameReader private[sql](sparkSession:
SparkSession) extends Logging {
new CaseInsensitiveStringMap(extraOptions.toMap.asJava)))
}
- /**
- * Loads text files and returns a `DataFrame` whose schema starts with a
string column named
- * "value", and followed by partitioned columns if there are any. See the
documentation on
- * the other overloaded `text()` method for more details.
- *
- * @since 2.0.0
- */
- def text(path: String): DataFrame = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- text(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def text(path: String): DataFrame = super.text(path)
- /**
- * Loads text files and returns a `DataFrame` whose schema starts with a
string column named
- * "value", and followed by partitioned columns if there are any.
- * The text files must be encoded as UTF-8.
- *
- * By default, each line in the text files is a new row in the resulting
DataFrame. For example:
- * {{{
- * // Scala:
- * spark.read.text("/path/to/spark/README.md")
- *
- * // Java:
- * spark.read().text("/path/to/spark/README.md")
- * }}}
- *
- * You can find the text-specific options for reading text files in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @param paths input paths
- * @since 1.6.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def text(paths: String*): DataFrame = format("text").load(paths : _*)
+ override def text(paths: String*): DataFrame = super.text(paths: _*)
- /**
- * Loads text files and returns a [[Dataset]] of String. See the
documentation on the
- * other overloaded `textFile()` method for more details.
- * @since 2.0.0
- */
- def textFile(path: String): Dataset[String] = {
- // This method ensures that calls that explicit need single argument
works, see SPARK-16009
- textFile(Seq(path): _*)
- }
+ /** @inheritdoc */
+ override def textFile(path: String): Dataset[String] = super.textFile(path)
- /**
- * Loads text files and returns a [[Dataset]] of String. The underlying
schema of the Dataset
- * contains a single string column named "value".
- * The text files must be encoded as UTF-8.
- *
- * If the directory structure of the text files contains partitioning
information, those are
- * ignored in the resulting Dataset. To include partitioning information as
columns, use `text`.
- *
- * By default, each line in the text files is a new row in the resulting
DataFrame. For example:
- * {{{
- * // Scala:
- * spark.read.textFile("/path/to/spark/README.md")
- *
- * // Java:
- * spark.read().textFile("/path/to/spark/README.md")
- * }}}
- *
- * You can set the text-specific options as specified in
`DataFrameReader.text`.
- *
- * @param paths input path
- * @since 2.0.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def textFile(paths: String*): Dataset[String] = {
- assertNoSpecifiedSchema("textFile")
- text(paths :
_*).select("value").as[String](sparkSession.implicits.newStringEncoder)
- }
+ override def textFile(paths: String*): Dataset[String] =
super.textFile(paths: _*)
- /**
- * A convenient function for schema validation in APIs.
- */
- private def assertNoSpecifiedSchema(operation: String): Unit = {
- if (userSpecifiedSchema.nonEmpty) {
- throw
QueryCompilationErrors.userSpecifiedSchemaUnsupportedError(operation)
- }
- }
-
- /**
- * Ensure that the `singleVariantColumn` option cannot be used if there is
also a user specified
- * schema.
- */
- private def validateSingleVariantColumn(): Unit = {
+ /** @inheritdoc */
+ override protected def validateSingleVariantColumn(): Unit = {
if (extraOptions.get(JSONOptions.SINGLE_VARIANT_COLUMN).isDefined &&
userSpecifiedSchema.isDefined) {
throw QueryCompilationErrors.invalidSingleVariantColumn()
}
}
-
///////////////////////////////////////////////////////////////////////////////////////
- // Builder pattern config options
-
///////////////////////////////////////////////////////////////////////////////////////
-
- private var source: String =
sparkSession.sessionState.conf.defaultDataSourceName
-
- private var userSpecifiedSchema: Option[StructType] = None
-
- private var extraOptions = CaseInsensitiveMap[String](Map.empty)
+ override protected def validateJsonSchema(): Unit =
+ userSpecifiedSchema.foreach(checkJsonSchema)
+ override protected def validateXmlSchema(): Unit =
+ userSpecifiedSchema.foreach(checkXmlSchema)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 55f67da68221..906e5ede8b4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -649,16 +649,7 @@ class SparkSession private(
artifactManager.addLocalArtifacts(uri.flatMap(Artifact.parseArtifacts))
}
- /**
- * Returns a [[DataFrameReader]] that can be used to read non-streaming data
in as a
- * `DataFrame`.
- * {{{
- * sparkSession.read.parquet("/path/to/file.parquet")
- * sparkSession.read.schema(schema).json("/path/to/file.json")
- * }}}
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def read: DataFrameReader = new DataFrameReader(self)
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]