This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e76c6c9de083 [SPARK-49414][CONNECT][SQL] Add Shared DataFrameReader 
interface
e76c6c9de083 is described below

commit e76c6c9de0834ebda0cc56e55b673febc15a96e5
Author: Herman van Hovell <[email protected]>
AuthorDate: Wed Sep 4 22:11:18 2024 -0400

    [SPARK-49414][CONNECT][SQL] Add Shared DataFrameReader interface
    
    ### What changes were proposed in this pull request?
    This PR creates a shared interface for DataFrameReader.
    
    ### Why are the changes needed?
    We are creating a shared Scala Spark SQL interface for Classic and Connect.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47975 from hvanhovell/SPARK-49414.
    
    Authored-by: Herman van Hovell <[email protected]>
    Signed-off-by: Herman van Hovell <[email protected]>
---
 .../org/apache/spark/sql/DataFrameReader.scala     | 566 +++------------------
 .../scala/org/apache/spark/sql/SparkSession.scala  |  11 +-
 .../CheckConnectJvmClientCompatibility.scala       |   8 +
 project/MimaExcludes.scala                         |  20 +
 .../apache/spark/sql/api}/DataFrameReader.scala    | 359 +++----------
 .../org/apache/spark/sql/api/SparkSession.scala    |  12 +
 .../org/apache/spark/sql/DataFrameReader.scala     | 544 +++-----------------
 .../scala/org/apache/spark/sql/SparkSession.scala  |  11 +-
 8 files changed, 261 insertions(+), 1270 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1ad98dc91b21..c3ee7030424e 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -23,11 +23,7 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.spark.annotation.Stable
 import org.apache.spark.connect.proto.Parse.ParseFormat
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
SparkCharVarcharUtils}
 import org.apache.spark.sql.connect.common.DataTypeProtoConverter
-import org.apache.spark.sql.errors.DataTypeErrors
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -37,144 +33,44 @@ import org.apache.spark.sql.types.StructType
  * @since 3.4.0
  */
 @Stable
-class DataFrameReader private[sql] (sparkSession: SparkSession) extends 
Logging {
-
-  /**
-   * Specifies the input data source format.
-   *
-   * @since 3.4.0
-   */
-  def format(source: String): DataFrameReader = {
-    this.source = source
-    this
-  }
+class DataFrameReader private[sql] (sparkSession: SparkSession)
+    extends api.DataFrameReader[Dataset] {
 
-  /**
-   * Specifies the input schema. Some data sources (e.g. JSON) can infer the 
input schema
-   * automatically from data. By specifying the schema here, the underlying 
data source can skip
-   * the schema inference step, and thus speed up data loading.
-   *
-   * @since 3.4.0
-   */
-  def schema(schema: StructType): DataFrameReader = {
-    if (schema != null) {
-      val replaced = 
SparkCharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
-      this.userSpecifiedSchema = Option(replaced)
-    }
-    this
-  }
+  /** @inheritdoc */
+  override def format(source: String): this.type = super.format(source)
 
-  /**
-   * Specifies the schema by using the input DDL-formatted string. Some data 
sources (e.g. JSON)
-   * can infer the input schema automatically from data. By specifying the 
schema here, the
-   * underlying data source can skip the schema inference step, and thus speed 
up data loading.
-   *
-   * {{{
-   *   spark.read.schema("a INT, b STRING, c DOUBLE").csv("test.csv")
-   * }}}
-   *
-   * @since 3.4.0
-   */
-  def schema(schemaString: String): DataFrameReader = {
-    schema(StructType.fromDDL(schemaString))
-  }
+  /** @inheritdoc */
+  override def schema(schema: StructType): this.type = super.schema(schema)
 
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
-   * has the same key case-insensitively, it will override the existing option.
-   *
-   * @since 3.4.0
-   */
-  def option(key: String, value: String): DataFrameReader = {
-    this.extraOptions = this.extraOptions + (key -> value)
-    this
-  }
+  /** @inheritdoc */
+  override def schema(schemaString: String): this.type = 
super.schema(schemaString)
 
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
-   * has the same key case-insensitively, it will override the existing option.
-   *
-   * @since 3.4.0
-   */
-  def option(key: String, value: Boolean): DataFrameReader = option(key, 
value.toString)
-
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
-   * has the same key case-insensitively, it will override the existing option.
-   *
-   * @since 3.4.0
-   */
-  def option(key: String, value: Long): DataFrameReader = option(key, 
value.toString)
-
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
-   * has the same key case-insensitively, it will override the existing option.
-   *
-   * @since 3.4.0
-   */
-  def option(key: String, value: Double): DataFrameReader = option(key, 
value.toString)
-
-  /**
-   * (Scala-specific) Adds input options for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
-   * has the same key case-insensitively, it will override the existing option.
-   *
-   * @since 3.4.0
-   */
-  def options(options: scala.collection.Map[String, String]): DataFrameReader 
= {
-    this.extraOptions ++= options
-    this
-  }
+  /** @inheritdoc */
+  override def option(key: String, value: String): this.type = 
super.option(key, value)
 
-  /**
-   * Adds input options for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
-   * has the same key case-insensitively, it will override the existing option.
-   *
-   * @since 3.4.0
-   */
-  def options(options: java.util.Map[String, String]): DataFrameReader = {
-    this.options(options.asScala)
-    this
-  }
+  /** @inheritdoc */
+  override def option(key: String, value: Boolean): this.type = 
super.option(key, value)
 
-  /**
-   * Loads input in as a `DataFrame`, for data sources that don't require a 
path (e.g. external
-   * key-value stores).
-   *
-   * @since 3.4.0
-   */
-  def load(): DataFrame = {
-    load(Seq.empty: _*) // force invocation of `load(...varargs...)`
-  }
+  /** @inheritdoc */
+  override def option(key: String, value: Long): this.type = super.option(key, 
value)
 
-  /**
-   * Loads input in as a `DataFrame`, for data sources that require a path 
(e.g. data backed by a
-   * local or distributed file system).
-   *
-   * @since 3.4.0
-   */
-  def load(path: String): DataFrame = {
-    // force invocation of `load(...varargs...)`
-    load(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def option(key: String, value: Double): this.type = 
super.option(key, value)
+
+  /** @inheritdoc */
+  override def options(options: scala.collection.Map[String, String]): 
this.type =
+    super.options(options)
 
-  /**
-   * Loads input in as a `DataFrame`, for data sources that support multiple 
paths. Only works if
-   * the source is a HadoopFsRelationProvider.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
+  override def options(options: java.util.Map[String, String]): this.type = 
super.options(options)
+
+  /** @inheritdoc */
+  override def load(): DataFrame = load(Nil: _*)
+
+  /** @inheritdoc */
+  def load(path: String): DataFrame = load(Seq(path): _*)
+
+  /** @inheritdoc */
   @scala.annotation.varargs
   def load(paths: String*): DataFrame = {
     sparkSession.newDataFrame { builder =>
@@ -190,93 +86,29 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
     }
   }
 
-  /**
-   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL url named
-   * table and connection properties.
-   *
-   * You can find the JDBC-specific option and parameter documentation for 
reading tables via JDBC
-   * in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 3.4.0
-   */
-  def jdbc(url: String, table: String, properties: Properties): DataFrame = {
-    // properties should override settings in extraOptions.
-    this.extraOptions ++= properties.asScala
-    // explicit url and dbtable should override all
-    this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
-    format("jdbc").load()
-  }
+  /** @inheritdoc */
+  override def jdbc(url: String, table: String, properties: Properties): 
DataFrame =
+    super.jdbc(url, table, properties)
 
-  // scalastyle:off line.size.limit
-  /**
-   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL url named
-   * table. Partitions of the table will be retrieved in parallel based on the 
parameters passed
-   * to this function.
-   *
-   * Don't create too many partitions in parallel on a large cluster; 
otherwise Spark might crash
-   * your external database systems.
-   *
-   * You can find the JDBC-specific option and parameter documentation for 
reading tables via JDBC
-   * in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @param table
-   *   Name of the table in the external database.
-   * @param columnName
-   *   Alias of `partitionColumn` option. Refer to `partitionColumn` in <a
-   *   
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   * @param connectionProperties
-   *   JDBC database connection arguments, a list of arbitrary string 
tag/value. Normally at least
-   *   a "user" and "password" property should be included. "fetchsize" can be 
used to control the
-   *   number of rows per fetch and "queryTimeout" can be used to wait for a 
Statement object to
-   *   execute to the given number of seconds.
-   * @since 3.4.0
-   */
-  // scalastyle:on line.size.limit
-  def jdbc(
+  /** @inheritdoc */
+  override def jdbc(
       url: String,
       table: String,
       columnName: String,
       lowerBound: Long,
       upperBound: Long,
       numPartitions: Int,
-      connectionProperties: Properties): DataFrame = {
-    // columnName, lowerBound, upperBound and numPartitions override settings 
in extraOptions.
-    this.extraOptions ++= Map(
-      "partitionColumn" -> columnName,
-      "lowerBound" -> lowerBound.toString,
-      "upperBound" -> upperBound.toString,
-      "numPartitions" -> numPartitions.toString)
-    jdbc(url, table, connectionProperties)
-  }
-
-  /**
-   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL url named
-   * table using connection properties. The `predicates` parameter gives a 
list expressions
-   * suitable for inclusion in WHERE clauses; each one defines one partition 
of the `DataFrame`.
-   *
-   * Don't create too many partitions in parallel on a large cluster; 
otherwise Spark might crash
-   * your external database systems.
-   *
-   * You can find the JDBC-specific option and parameter documentation for 
reading tables via JDBC
-   * in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @param table
-   *   Name of the table in the external database.
-   * @param predicates
-   *   Condition in the where clause for each partition.
-   * @param connectionProperties
-   *   JDBC database connection arguments, a list of arbitrary string 
tag/value. Normally at least
-   *   a "user" and "password" property should be included. "fetchsize" can be 
used to control the
-   *   number of rows per fetch.
-   * @since 3.4.0
-   */
+      connectionProperties: Properties): DataFrame =
+    super.jdbc(
+      url,
+      table,
+      columnName,
+      lowerBound,
+      upperBound,
+      numPartitions,
+      connectionProperties)
+
+  /** @inheritdoc */
   def jdbc(
       url: String,
       table: String,
@@ -296,207 +128,56 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
     }
   }
 
-  /**
-   * Loads a JSON file and returns the results as a `DataFrame`.
-   *
-   * See the documentation on the overloaded `json()` method with varargs for 
more details.
-   *
-   * @since 3.4.0
-   */
-  def json(path: String): DataFrame = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    json(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def json(path: String): DataFrame = super.json(path)
 
-  /**
-   * Loads JSON files and returns the results as a `DataFrame`.
-   *
-   * <a href="http://jsonlines.org/";>JSON Lines</a> (newline-delimited JSON) 
is supported by
-   * default. For JSON (one record per file), set the `multiLine` option to 
true.
-   *
-   * This function goes through the input once to determine the input schema. 
If you know the
-   * schema in advance, use the version that specifies the schema to avoid the 
extra scan.
-   *
-   * You can find the JSON-specific options for reading JSON files in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def json(paths: String*): DataFrame = {
-    format("json").load(paths: _*)
-  }
+  override def json(paths: String*): DataFrame = super.json(paths: _*)
 
-  /**
-   * Loads a `Dataset[String]` storing JSON objects (<a 
href="http://jsonlines.org/";>JSON Lines
-   * text format or newline-delimited JSON</a>) and returns the result as a 
`DataFrame`.
-   *
-   * Unless the schema is specified using `schema` function, this function 
goes through the input
-   * once to determine the input schema.
-   *
-   * @param jsonDataset
-   *   input Dataset with one JSON object per record
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def json(jsonDataset: Dataset[String]): DataFrame =
     parse(jsonDataset, ParseFormat.PARSE_FORMAT_JSON)
 
-  /**
-   * Loads a CSV file and returns the result as a `DataFrame`. See the 
documentation on the other
-   * overloaded `csv()` method for more details.
-   *
-   * @since 3.4.0
-   */
-  def csv(path: String): DataFrame = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    csv(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def csv(path: String): DataFrame = super.csv(path)
 
-  /**
-   * Loads CSV files and returns the result as a `DataFrame`.
-   *
-   * This function will go through the input once to determine the input 
schema if `inferSchema`
-   * is enabled. To avoid going through the entire data once, disable 
`inferSchema` option or
-   * specify the schema explicitly using `schema`.
-   *
-   * You can find the CSV-specific options for reading CSV files in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def csv(paths: String*): DataFrame = format("csv").load(paths: _*)
-
-  /**
-   * Loads an `Dataset[String]` storing CSV rows and returns the result as a 
`DataFrame`.
-   *
-   * If the schema is not specified using `schema` function and `inferSchema` 
option is enabled,
-   * this function goes through the input once to determine the input schema.
-   *
-   * If the schema is not specified using `schema` function and `inferSchema` 
option is disabled,
-   * it determines the columns as string types and it reads only the first 
line to determine the
-   * names and the number of fields.
-   *
-   * If the enforceSchema is set to `false`, only the CSV header in the first 
line is checked to
-   * conform specified or inferred schema.
-   *
-   * @note
-   *   if `header` option is set to `true` when calling this API, all lines 
same with the header
-   *   will be removed if exists.
-   * @param csvDataset
-   *   input Dataset with one CSV row per record
-   * @since 3.4.0
-   */
+  override def csv(paths: String*): DataFrame = super.csv(paths: _*)
+
+  /** @inheritdoc */
   def csv(csvDataset: Dataset[String]): DataFrame =
     parse(csvDataset, ParseFormat.PARSE_FORMAT_CSV)
 
-  /**
-   * Loads a XML file and returns the result as a `DataFrame`. See the 
documentation on the other
-   * overloaded `xml()` method for more details.
-   *
-   * @since 4.0.0
-   */
-  def xml(path: String): DataFrame = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    xml(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def xml(path: String): DataFrame = super.xml(path)
 
-  /**
-   * Loads XML files and returns the result as a `DataFrame`.
-   *
-   * This function will go through the input once to determine the input 
schema if `inferSchema`
-   * is enabled. To avoid going through the entire data once, disable 
`inferSchema` option or
-   * specify the schema explicitly using `schema`.
-   *
-   * You can find the XML-specific options for reading XML files in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 4.0.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def xml(paths: String*): DataFrame = format("xml").load(paths: _*)
-
-  /**
-   * Loads an `Dataset[String]` storing XML object and returns the result as a 
`DataFrame`.
-   *
-   * If the schema is not specified using `schema` function and `inferSchema` 
option is enabled,
-   * this function goes through the input once to determine the input schema.
-   *
-   * @param xmlDataset
-   *   input Dataset with one XML object per record
-   * @since 4.0.0
-   */
+  override def xml(paths: String*): DataFrame = super.xml(paths: _*)
+
+  /** @inheritdoc */
   def xml(xmlDataset: Dataset[String]): DataFrame =
     parse(xmlDataset, ParseFormat.PARSE_FORMAT_UNSPECIFIED)
 
-  /**
-   * Loads a Parquet file, returning the result as a `DataFrame`. See the 
documentation on the
-   * other overloaded `parquet()` method for more details.
-   *
-   * @since 3.4.0
-   */
-  def parquet(path: String): DataFrame = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    parquet(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def parquet(path: String): DataFrame = super.parquet(path)
 
-  /**
-   * Loads a Parquet file, returning the result as a `DataFrame`.
-   *
-   * Parquet-specific option(s) for reading Parquet files can be found in <a 
href=
-   * 
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option";>
 Data
-   * Source Option</a> in the version you use.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def parquet(paths: String*): DataFrame = {
-    format("parquet").load(paths: _*)
-  }
+  override def parquet(paths: String*): DataFrame = super.parquet(paths: _*)
 
-  /**
-   * Loads an ORC file and returns the result as a `DataFrame`.
-   *
-   * @param path
-   *   input path
-   * @since 3.4.0
-   */
-  def orc(path: String): DataFrame = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    orc(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def orc(path: String): DataFrame = super.orc(path)
 
-  /**
-   * Loads ORC files and returns the result as a `DataFrame`.
-   *
-   * ORC-specific option(s) for reading ORC files can be found in <a href=
-   * 
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option";>
 Data
-   * Source Option</a> in the version you use.
-   *
-   * @param paths
-   *   input paths
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def orc(paths: String*): DataFrame = format("orc").load(paths: _*)
-
-  /**
-   * Returns the specified table/view as a `DataFrame`. If it's a table, it 
must support batch
-   * reading and the returned DataFrame is the batch scan query plan of this 
table. If it's a
-   * view, the returned DataFrame is simply the query plan of the view, which 
can either be a
-   * batch or streaming query plan.
-   *
-   * @param tableName
-   *   is either a qualified or unqualified name that designates a table or 
view. If a database is
-   *   specified, it identifies the table/view from the database. Otherwise, 
it first attempts to
-   *   find a temporary view with the given name and then match the table/view 
from the current
-   *   database. Note that, the global temporary view database is also valid 
here.
-   * @since 3.4.0
-   */
+  override def orc(paths: String*): DataFrame = super.orc(paths: _*)
+
+  /** @inheritdoc */
   def table(tableName: String): DataFrame = {
+    assertNoSpecifiedSchema("table")
     sparkSession.newDataFrame { builder =>
       builder.getReadBuilder.getNamedTableBuilder
         .setUnparsedIdentifier(tableName)
@@ -504,80 +185,19 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
     }
   }
 
-  /**
-   * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
-   * "value", and followed by partitioned columns if there are any. See the 
documentation on the
-   * other overloaded `text()` method for more details.
-   *
-   * @since 3.4.0
-   */
-  def text(path: String): DataFrame = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    text(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def text(path: String): DataFrame = super.text(path)
 
-  /**
-   * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
-   * "value", and followed by partitioned columns if there are any. The text 
files must be encoded
-   * as UTF-8.
-   *
-   * By default, each line in the text files is a new row in the resulting 
DataFrame. For example:
-   * {{{
-   *   // Scala:
-   *   spark.read.text("/path/to/spark/README.md")
-   *
-   *   // Java:
-   *   spark.read().text("/path/to/spark/README.md")
-   * }}}
-   *
-   * You can find the text-specific options for reading text files in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @param paths
-   *   input paths
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def text(paths: String*): DataFrame = format("text").load(paths: _*)
-
-  /**
-   * Loads text files and returns a [[Dataset]] of String. See the 
documentation on the other
-   * overloaded `textFile()` method for more details.
-   * @since 3.4.0
-   */
-  def textFile(path: String): Dataset[String] = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    textFile(Seq(path): _*)
-  }
+  override def text(paths: String*): DataFrame = super.text(paths: _*)
 
-  /**
-   * Loads text files and returns a [[Dataset]] of String. The underlying 
schema of the Dataset
-   * contains a single string column named "value". The text files must be 
encoded as UTF-8.
-   *
-   * If the directory structure of the text files contains partitioning 
information, those are
-   * ignored in the resulting Dataset. To include partitioning information as 
columns, use `text`.
-   *
-   * By default, each line in the text files is a new row in the resulting 
DataFrame. For example:
-   * {{{
-   *   // Scala:
-   *   spark.read.textFile("/path/to/spark/README.md")
-   *
-   *   // Java:
-   *   spark.read().textFile("/path/to/spark/README.md")
-   * }}}
-   *
-   * You can set the text-specific options as specified in 
`DataFrameReader.text`.
-   *
-   * @param paths
-   *   input path
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
+  override def textFile(path: String): Dataset[String] = super.textFile(path)
+
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def textFile(paths: String*): Dataset[String] = {
-    assertNoSpecifiedSchema("textFile")
-    text(paths: _*).select("value").as(StringEncoder)
-  }
+  override def textFile(paths: String*): Dataset[String] = 
super.textFile(paths: _*)
 
   private def assertSourceFormatSpecified(): Unit = {
     if (source == null) {
@@ -597,24 +217,4 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
       }
     }
   }
-
-  /**
-   * A convenient function for schema validation in APIs.
-   */
-  private def assertNoSpecifiedSchema(operation: String): Unit = {
-    if (userSpecifiedSchema.nonEmpty) {
-      throw DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation)
-    }
-  }
-
-  
///////////////////////////////////////////////////////////////////////////////////////
-  // Builder pattern config options
-  
///////////////////////////////////////////////////////////////////////////////////////
-
-  private var source: String = _
-
-  private var userSpecifiedSchema: Option[StructType] = None
-
-  private var extraOptions = CaseInsensitiveMap[String](Map.empty)
-
 }
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 092a72ade1ea..93be185e0ffd 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -214,16 +214,7 @@ class SparkSession private[sql] (
     sql(query, Array.empty)
   }
 
-  /**
-   * Returns a [[DataFrameReader]] that can be used to read non-streaming data 
in as a
-   * `DataFrame`.
-   * {{{
-   *   sparkSession.read.parquet("/path/to/file.parquet")
-   *   sparkSession.read.schema(schema).json("/path/to/file.json")
-   * }}}
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def read: DataFrameReader = new DataFrameReader(this)
 
   /**
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 10b31155376f..aefd830ef181 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -320,6 +320,14 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[DirectMissingMethodProblem](
         
"org.apache.spark.sql.UDFRegistration.initializeLogIfNecessary$default$2"),
 
+      // Protected DataFrameReader methods...
+      ProblemFilters.exclude[DirectMissingMethodProblem](
+        "org.apache.spark.sql.DataFrameReader.validateSingleVariantColumn"),
+      ProblemFilters.exclude[DirectMissingMethodProblem](
+        "org.apache.spark.sql.DataFrameReader.validateJsonSchema"),
+      ProblemFilters.exclude[DirectMissingMethodProblem](
+        "org.apache.spark.sql.DataFrameReader.validateXmlSchema"),
+
       // Datasource V2 partition transforms
       
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.PartitionTransform"),
       
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.PartitionTransform$"),
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 03b7b6efca1b..7abf31f4aca3 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -125,6 +125,26 @@ object MimaExcludes {
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Observation"),
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Observation$"),
 
+    // SPARK-49414: Remove Logging from DataFrameReader.
+    
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.DataFrameReader"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logName"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.log"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logInfo"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logDebug"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logTrace"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logWarning"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logError"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logInfo"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logDebug"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logTrace"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logWarning"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.logError"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.isTraceEnabled"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.initializeLogIfNecessary"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.initializeLogIfNecessary"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.initializeLogIfNecessary$default$2"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.initializeForcefully"),
+
     // SPARK-49425: Create a shared DataFrameWriter interface.
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameWriter"),
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala
similarity index 57%
copy from sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
copy to sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala
index 9d7a765a24c9..b9910e846826 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/DataFrameReader.scala
@@ -14,38 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.spark.sql
-
-import java.util.{Locale, Properties}
+package org.apache.spark.sql.api
 
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.Partition
+import _root_.java.util
+
 import org.apache.spark.annotation.Stable
-import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, 
UnivocityParser}
-import org.apache.spark.sql.catalyst.expressions.ExprUtils
-import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, 
JSONOptions}
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils, FailureSafeParser}
-import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, XmlOptions}
-import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.execution.datasources.csv._
-import org.apache.spark.sql.execution.datasources.jdbc._
-import 
org.apache.spark.sql.execution.datasources.json.JsonUtils.checkJsonSchema
-import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
-import org.apache.spark.sql.execution.datasources.xml.TextInputXmlDataSource
-import org.apache.spark.sql.execution.datasources.xml.XmlUtils.checkXmlSchema
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
SparkCharVarcharUtils}
+import org.apache.spark.sql.errors.DataTypeErrors
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Interface used to load a [[Dataset]] from external storage systems (e.g. 
file systems,
@@ -54,14 +34,13 @@ import org.apache.spark.unsafe.types.UTF8String
  * @since 1.4.0
  */
 @Stable
-class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging 
{
-
-  /**
+abstract class DataFrameReader[DS[U] <: Dataset[U, DS]] {
+/**
    * Specifies the input data source format.
    *
    * @since 1.4.0
    */
-  def format(source: String): DataFrameReader = {
+  def format(source: String): this.type = {
     this.source = source
     this
   }
@@ -73,9 +52,9 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 1.4.0
    */
-  def schema(schema: StructType): DataFrameReader = {
+  def schema(schema: StructType): this.type = {
     if (schema != null) {
-      val replaced = 
CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
+      val replaced = 
SparkCharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
       this.userSpecifiedSchema = Option(replaced)
       validateSingleVariantColumn()
     }
@@ -93,9 +72,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 2.3.0
    */
-  def schema(schemaString: String): DataFrameReader = {
-    schema(StructType.fromDDL(schemaString))
-  }
+  def schema(schemaString: String): this.type = 
schema(StructType.fromDDL(schemaString))
 
   /**
    * Adds an input option for the underlying data source.
@@ -105,7 +82,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 1.4.0
    */
-  def option(key: String, value: String): DataFrameReader = {
+  def option(key: String, value: String): this.type = {
     this.extraOptions = this.extraOptions + (key -> value)
     validateSingleVariantColumn()
     this
@@ -119,7 +96,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 2.0.0
    */
-  def option(key: String, value: Boolean): DataFrameReader = option(key, 
value.toString)
+  def option(key: String, value: Boolean): this.type = option(key, 
value.toString)
 
   /**
    * Adds an input option for the underlying data source.
@@ -129,7 +106,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 2.0.0
    */
-  def option(key: String, value: Long): DataFrameReader = option(key, 
value.toString)
+  def option(key: String, value: Long): this.type = option(key, value.toString)
 
   /**
    * Adds an input option for the underlying data source.
@@ -139,7 +116,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 2.0.0
    */
-  def option(key: String, value: Double): DataFrameReader = option(key, 
value.toString)
+  def option(key: String, value: Double): this.type = option(key, 
value.toString)
 
   /**
    * (Scala-specific) Adds input options for the underlying data source.
@@ -149,7 +126,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 1.4.0
    */
-  def options(options: scala.collection.Map[String, String]): DataFrameReader 
= {
+  def options(options: scala.collection.Map[String, String]): this.type = {
     this.extraOptions ++= options
     validateSingleVariantColumn()
     this
@@ -163,10 +140,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 1.4.0
    */
-  def options(options: java.util.Map[String, String]): DataFrameReader = {
-    this.options(options.asScala)
-    this
-  }
+  def options(opts: util.Map[String, String]): this.type = 
options(opts.asScala)
 
   /**
    * Loads input in as a `DataFrame`, for data sources that don't require a 
path (e.g. external
@@ -174,9 +148,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 1.4.0
    */
-  def load(): DataFrame = {
-    load(Seq.empty: _*) // force invocation of `load(...varargs...)`
-  }
+  def load(): DS[Row]
 
   /**
    * Loads input in as a `DataFrame`, for data sources that require a path 
(e.g. data backed by
@@ -184,14 +156,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 1.4.0
    */
-  def load(path: String): DataFrame = {
-    // force invocation of `load(...varargs...)`
-    if (sparkSession.sessionState.conf.legacyPathOptionBehavior) {
-      option("path", path).load(Seq.empty: _*)
-    } else {
-      load(Seq(path): _*)
-    }
-  }
+  def load(path: String): DS[Row]
 
   /**
    * Loads input in as a `DataFrame`, for data sources that support multiple 
paths.
@@ -200,40 +165,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * @since 1.6.0
    */
   @scala.annotation.varargs
-  def load(paths: String*): DataFrame = {
-    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
-      throw 
QueryCompilationErrors.cannotOperateOnHiveDataSourceFilesError("read")
-    }
-
-    val legacyPathOptionBehavior = 
sparkSession.sessionState.conf.legacyPathOptionBehavior
-    if (!legacyPathOptionBehavior &&
-        (extraOptions.contains("path") || extraOptions.contains("paths")) && 
paths.nonEmpty) {
-      throw QueryCompilationErrors.pathOptionNotSetCorrectlyWhenReadingError()
-    }
-
-    DataSource.lookupDataSourceV2(source, 
sparkSession.sessionState.conf).flatMap { provider =>
-      DataSourceV2Utils.loadV2Source(sparkSession, provider, 
userSpecifiedSchema, extraOptions,
-        source, paths: _*)
-    }.getOrElse(loadV1Source(paths: _*))
-  }
-
-  private def loadV1Source(paths: String*) = {
-    val legacyPathOptionBehavior = 
sparkSession.sessionState.conf.legacyPathOptionBehavior
-    val (finalPaths, finalOptions) = if (!legacyPathOptionBehavior && 
paths.length == 1) {
-      (Nil, extraOptions + ("path" -> paths.head))
-    } else {
-      (paths, extraOptions)
-    }
-
-    // Code path for data source v1.
-    sparkSession.baseRelationToDataFrame(
-      DataSource.apply(
-        sparkSession,
-        paths = finalPaths,
-        userSpecifiedSchema = userSpecifiedSchema,
-        className = source,
-        options = finalOptions.originalMap).resolveRelation())
-  }
+  def load(paths: String*): DS[Row]
 
   /**
    * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
@@ -246,12 +178,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 1.4.0
    */
-  def jdbc(url: String, table: String, properties: Properties): DataFrame = {
+  def jdbc(url: String, table: String, properties: util.Properties): DS[Row] = 
{
     assertNoSpecifiedSchema("jdbc")
     // properties should override settings in extraOptions.
     this.extraOptions ++= properties.asScala
     // explicit url and dbtable should override all
-    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url, 
JDBCOptions.JDBC_TABLE_NAME -> table)
+    this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
     format("jdbc").load()
   }
 
@@ -287,13 +219,13 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
       lowerBound: Long,
       upperBound: Long,
       numPartitions: Int,
-      connectionProperties: Properties): DataFrame = {
+      connectionProperties: util.Properties): DS[Row] = {
     // columnName, lowerBound, upperBound and numPartitions override settings 
in extraOptions.
     this.extraOptions ++= Map(
-      JDBCOptions.JDBC_PARTITION_COLUMN -> columnName,
-      JDBCOptions.JDBC_LOWER_BOUND -> lowerBound.toString,
-      JDBCOptions.JDBC_UPPER_BOUND -> upperBound.toString,
-      JDBCOptions.JDBC_NUM_PARTITIONS -> numPartitions.toString)
+      "partitionColumn" -> columnName,
+      "lowerBound" -> lowerBound.toString,
+      "upperBound" -> upperBound.toString,
+      "numPartitions" -> numPartitions.toString)
     jdbc(url, table, connectionProperties)
   }
 
@@ -323,17 +255,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
       url: String,
       table: String,
       predicates: Array[String],
-      connectionProperties: Properties): DataFrame = {
-    assertNoSpecifiedSchema("jdbc")
-    // connectionProperties should override settings in extraOptions.
-    val params = extraOptions ++ connectionProperties.asScala
-    val options = new JDBCOptions(url, table, params)
-    val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) 
=>
-      JDBCPartition(part, i) : Partition
-    }
-    val relation = JDBCRelation(parts, options)(sparkSession)
-    sparkSession.baseRelationToDataFrame(relation)
-  }
+      connectionProperties: util.Properties): DS[Row]
 
   /**
    * Loads a JSON file and returns the results as a `DataFrame`.
@@ -342,7 +264,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 1.4.0
    */
-  def json(path: String): DataFrame = {
+  def json(path: String): DS[Row] = {
     // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
     json(Seq(path): _*)
   }
@@ -358,43 +280,14 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * You can find the JSON-specific options for reading JSON files in
    * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
+   * Data Source Option</a> in the version you use.
    *
    * @since 2.0.0
    */
   @scala.annotation.varargs
-  def json(paths: String*): DataFrame = {
-    userSpecifiedSchema.foreach(checkJsonSchema)
-    format("json").load(paths : _*)
-  }
-
-  /**
-   * Loads a `JavaRDD[String]` storing JSON objects (<a 
href="http://jsonlines.org/";>JSON
-   * Lines text format or newline-delimited JSON</a>) and returns the result as
-   * a `DataFrame`.
-   *
-   * Unless the schema is specified using `schema` function, this function 
goes through the
-   * input once to determine the input schema.
-   *
-   * @param jsonRDD input RDD with one JSON object per record
-   * @since 1.4.0
-   */
-  @deprecated("Use json(Dataset[String]) instead.", "2.2.0")
-  def json(jsonRDD: JavaRDD[String]): DataFrame = json(jsonRDD.rdd)
-
-  /**
-   * Loads an `RDD[String]` storing JSON objects (<a 
href="http://jsonlines.org/";>JSON Lines
-   * text format or newline-delimited JSON</a>) and returns the result as a 
`DataFrame`.
-   *
-   * Unless the schema is specified using `schema` function, this function 
goes through the
-   * input once to determine the input schema.
-   *
-   * @param jsonRDD input RDD with one JSON object per record
-   * @since 1.4.0
-   */
-  @deprecated("Use json(Dataset[String]) instead.", "2.2.0")
-  def json(jsonRDD: RDD[String]): DataFrame = {
-    json(sparkSession.createDataset(jsonRDD)(Encoders.STRING))
+  def json(paths: String*): DS[Row] = {
+    validateJsonSchema()
+    format("json").load(paths: _*)
   }
 
   /**
@@ -407,37 +300,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * @param jsonDataset input Dataset with one JSON object per record
    * @since 2.2.0
    */
-  def json(jsonDataset: Dataset[String]): DataFrame = {
-    val parsedOptions = new JSONOptions(
-      extraOptions.toMap,
-      sparkSession.sessionState.conf.sessionLocalTimeZone,
-      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-
-    userSpecifiedSchema.foreach(checkJsonSchema)
-    val schema = userSpecifiedSchema.map {
-      case s if !SQLConf.get.getConf(
-        SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) => 
s.asNullable
-      case other => other
-    }.getOrElse {
-      TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
-    }
-
-    ExprUtils.verifyColumnNameOfCorruptRecord(schema, 
parsedOptions.columnNameOfCorruptRecord)
-    val actualSchema =
-      StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
-
-    val createParser = CreateJacksonParser.string _
-    val parsed = jsonDataset.rdd.mapPartitions { iter =>
-      val rawParser = new JacksonParser(actualSchema, parsedOptions, 
allowArrayAsStructs = true)
-      val parser = new FailureSafeParser[String](
-        input => rawParser.parse(input, createParser, UTF8String.fromString),
-        parsedOptions.parseMode,
-        schema,
-        parsedOptions.columnNameOfCorruptRecord)
-      iter.flatMap(parser.parse)
-    }
-    sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = 
jsonDataset.isStreaming)
-  }
+  def json(jsonDataset: DS[String] ): DS[Row]
 
   /**
    * Loads a CSV file and returns the result as a `DataFrame`. See the 
documentation on the
@@ -445,7 +308,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 2.0.0
    */
-  def csv(path: String): DataFrame = {
+  def csv(path: String): DS[Row] = {
     // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
     csv(Seq(path): _*)
   }
@@ -469,63 +332,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * @param csvDataset input Dataset with one CSV row per record
    * @since 2.2.0
    */
-  def csv(csvDataset: Dataset[String]): DataFrame = {
-    val parsedOptions: CSVOptions = new CSVOptions(
-      extraOptions.toMap,
-      sparkSession.sessionState.conf.csvColumnPruning,
-      sparkSession.sessionState.conf.sessionLocalTimeZone)
-    val filteredLines: Dataset[String] =
-      CSVUtils.filterCommentAndEmpty(csvDataset, parsedOptions)
-
-    // For performance, short-circuit the collection of the first line when it 
won't be used:
-    //   - TextInputCSVDataSource - Only uses firstLine to infer an 
unspecified schema
-    //   - CSVHeaderChecker       - Only uses firstLine to check header, when 
headerFlag is true
-    //   - CSVUtils               - Only uses firstLine to filter headers, 
when headerFlag is true
-    // (If the downstream logic grows more complicated, consider refactoring 
to an approach that
-    //  delegates this decision to the constituent consumers themselves.)
-    val maybeFirstLine: Option[String] =
-      if (userSpecifiedSchema.isEmpty || parsedOptions.headerFlag) {
-        filteredLines.take(1).headOption
-      } else {
-        None
-      }
-
-    val schema = userSpecifiedSchema.map {
-      case s if !SQLConf.get.getConf(
-        SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) => 
s.asNullable
-      case other => other
-    }.getOrElse {
-      TextInputCSVDataSource.inferFromDataset(
-        sparkSession,
-        csvDataset,
-        maybeFirstLine,
-        parsedOptions)
-    }
-
-    ExprUtils.verifyColumnNameOfCorruptRecord(schema, 
parsedOptions.columnNameOfCorruptRecord)
-    val actualSchema =
-      StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
-
-    val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
-      val headerChecker = new CSVHeaderChecker(
-        actualSchema,
-        parsedOptions,
-        source = s"CSV source: $csvDataset")
-      headerChecker.checkHeaderColumnNames(firstLine)
-      filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, 
parsedOptions))
-    }.getOrElse(filteredLines.rdd)
-
-    val parsed = linesWithoutHeader.mapPartitions { iter =>
-      val rawParser = new UnivocityParser(actualSchema, parsedOptions)
-      val parser = new FailureSafeParser[String](
-        input => rawParser.parse(input),
-        parsedOptions.parseMode,
-        schema,
-        parsedOptions.columnNameOfCorruptRecord)
-      iter.flatMap(parser.parse)
-    }
-    sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = 
csvDataset.isStreaming)
-  }
+  def csv(csvDataset: DS[String]): DS[Row]
 
   /**
    * Loads CSV files and returns the result as a `DataFrame`.
@@ -541,7 +348,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * @since 2.0.0
    */
   @scala.annotation.varargs
-  def csv(paths: String*): DataFrame = format("csv").load(paths : _*)
+  def csv(paths: String*): DS[Row] = format("csv").load(paths : _*)
 
   /**
    * Loads a XML file and returns the result as a `DataFrame`. See the 
documentation on the
@@ -549,7 +356,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 4.0.0
    */
-  def xml(path: String): DataFrame = {
+  def xml(path: String): DS[Row] = {
     // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
     xml(Seq(path): _*)
   }
@@ -568,8 +375,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * @since 4.0.0
    */
   @scala.annotation.varargs
-  def xml(paths: String*): DataFrame = {
-    userSpecifiedSchema.foreach(checkXmlSchema)
+  def xml(paths: String*): DS[Row] = {
+    validateXmlSchema()
     format("xml").load(paths: _*)
   }
 
@@ -582,37 +389,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * @param xmlDataset input Dataset with one XML object per record
    * @since 4.0.0
    */
-  def xml(xmlDataset: Dataset[String]): DataFrame = {
-    val parsedOptions: XmlOptions = new XmlOptions(
-      extraOptions.toMap,
-      sparkSession.sessionState.conf.sessionLocalTimeZone,
-      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-
-    userSpecifiedSchema.foreach(checkXmlSchema)
-
-    val schema = userSpecifiedSchema.map {
-      case s if !SQLConf.get.getConf(
-        SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) => 
s.asNullable
-      case other => other
-    }.getOrElse {
-      TextInputXmlDataSource.inferFromDataset(xmlDataset, parsedOptions)
-    }
-
-    ExprUtils.verifyColumnNameOfCorruptRecord(schema, 
parsedOptions.columnNameOfCorruptRecord)
-    val actualSchema =
-      StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
-
-    val parsed = xmlDataset.rdd.mapPartitions { iter =>
-      val rawParser = new StaxXmlParser(actualSchema, parsedOptions)
-      val parser = new FailureSafeParser[String](
-        input => rawParser.parse(input),
-        parsedOptions.parseMode,
-        schema,
-        parsedOptions.columnNameOfCorruptRecord)
-      iter.flatMap(parser.parse)
-    }
-    sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = 
xmlDataset.isStreaming)
-  }
+  def xml(xmlDataset: DS[String]): DS[Row]
 
   /**
    * Loads a Parquet file, returning the result as a `DataFrame`. See the 
documentation
@@ -620,7 +397,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 2.0.0
    */
-  def parquet(path: String): DataFrame = {
+  def parquet(path: String): DS[Row] = {
     // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
     parquet(Seq(path): _*)
   }
@@ -636,9 +413,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * @since 1.4.0
    */
   @scala.annotation.varargs
-  def parquet(paths: String*): DataFrame = {
-    format("parquet").load(paths: _*)
-  }
+  def parquet(paths: String*): DS[Row] = format("parquet").load(paths: _*)
 
   /**
    * Loads an ORC file and returns the result as a `DataFrame`.
@@ -646,7 +421,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * @param path input path
    * @since 1.5.0
    */
-  def orc(path: String): DataFrame = {
+  def orc(path: String): DS[Row] = {
     // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
     orc(Seq(path): _*)
   }
@@ -663,7 +438,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * @since 2.0.0
    */
   @scala.annotation.varargs
-  def orc(paths: String*): DataFrame = format("orc").load(paths: _*)
+  def orc(paths: String*): DS[Row] = format("orc").load(paths: _*)
 
   /**
    * Returns the specified table/view as a `DataFrame`. If it's a table, it 
must support batch
@@ -678,13 +453,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *                  Note that, the global temporary view database is also 
valid here.
    * @since 1.4.0
    */
-  def table(tableName: String): DataFrame = {
-    assertNoSpecifiedSchema("table")
-    val multipartIdentifier =
-      sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
-    Dataset.ofRows(sparkSession, UnresolvedRelation(multipartIdentifier,
-      new CaseInsensitiveStringMap(extraOptions.toMap.asJava)))
-  }
+  def table(tableName: String): DS[Row]
 
   /**
    * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
@@ -693,7 +462,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *
    * @since 2.0.0
    */
-  def text(path: String): DataFrame = {
+  def text(path: String): DS[Row] = {
     // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
     text(Seq(path): _*)
   }
@@ -720,14 +489,14 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * @since 1.6.0
    */
   @scala.annotation.varargs
-  def text(paths: String*): DataFrame = format("text").load(paths : _*)
+  def text(paths: String*): DS[Row] = format("text").load(paths : _*)
 
   /**
    * Loads text files and returns a [[Dataset]] of String. See the 
documentation on the
    * other overloaded `textFile()` method for more details.
    * @since 2.0.0
    */
-  def textFile(path: String): Dataset[String] = {
+  def textFile(path: String): DS[String] = {
     // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
     textFile(Seq(path): _*)
   }
@@ -755,17 +524,17 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * @since 2.0.0
    */
   @scala.annotation.varargs
-  def textFile(paths: String*): Dataset[String] = {
+  def textFile(paths: String*): DS[String] = {
     assertNoSpecifiedSchema("textFile")
-    text(paths : 
_*).select("value").as[String](sparkSession.implicits.newStringEncoder)
+    text(paths : _*).select("value").as(StringEncoder)
   }
 
   /**
    * A convenient function for schema validation in APIs.
    */
-  private def assertNoSpecifiedSchema(operation: String): Unit = {
+  protected def assertNoSpecifiedSchema(operation: String): Unit = {
     if (userSpecifiedSchema.nonEmpty) {
-      throw 
QueryCompilationErrors.userSpecifiedSchemaUnsupportedError(operation)
+      throw DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation)
     }
   }
 
@@ -773,21 +542,19 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * Ensure that the `singleVariantColumn` option cannot be used if there is 
also a user specified
    * schema.
    */
-  private def validateSingleVariantColumn(): Unit = {
-    if (extraOptions.get(JSONOptions.SINGLE_VARIANT_COLUMN).isDefined &&
-      userSpecifiedSchema.isDefined) {
-      throw QueryCompilationErrors.invalidSingleVariantColumn()
-    }
-  }
+  protected def validateSingleVariantColumn(): Unit = ()
+
+  protected def validateJsonSchema(): Unit = ()
+
+  protected def validateXmlSchema(): Unit = ()
 
   
///////////////////////////////////////////////////////////////////////////////////////
   // Builder pattern config options
   
///////////////////////////////////////////////////////////////////////////////////////
 
-  private var source: String = 
sparkSession.sessionState.conf.defaultDataSourceName
-
-  private var userSpecifiedSchema: Option[StructType] = None
+  protected var source: String = _
 
-  private var extraOptions = CaseInsensitiveMap[String](Map.empty)
+  protected var userSpecifiedSchema: Option[StructType] = None
 
+  protected var extraOptions: CaseInsensitiveMap[String] = 
CaseInsensitiveMap[String](Map.empty)
 }
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
index 0c173d9da498..633a54e32bc8 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -385,6 +385,18 @@ abstract class SparkSession[DS[U] <: Dataset[U, DS]] 
extends Serializable with C
   @scala.annotation.varargs
   def addArtifacts(uri: URI*): Unit
 
+  /**
+   * Returns a [[DataFrameReader]] that can be used to read non-streaming data 
in as a
+   * `DataFrame`.
+   * {{{
+   *   sparkSession.read.parquet("/path/to/file.parquet")
+   *   sparkSession.read.schema(schema).json("/path/to/file.json")
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  def read: DataFrameReader[DS]
+
   /**
    * Executes some code block and prints to stdout the time taken to execute 
the block. This is
    * available in Scala only and is used primarily for interactive testing and 
debugging.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 9d7a765a24c9..f105a77cf253 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -24,13 +24,12 @@ import scala.jdk.CollectionConverters._
 import org.apache.spark.Partition
 import org.apache.spark.annotation.Stable
 import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, 
UnivocityParser}
 import org.apache.spark.sql.catalyst.expressions.ExprUtils
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, 
JSONOptions}
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils, FailureSafeParser}
+import org.apache.spark.sql.catalyst.util.FailureSafeParser
 import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, XmlOptions}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -54,136 +53,42 @@ import org.apache.spark.unsafe.types.UTF8String
  * @since 1.4.0
  */
 @Stable
-class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging 
{
+class DataFrameReader private[sql](sparkSession: SparkSession)
+  extends api.DataFrameReader[Dataset] {
+  format(sparkSession.sessionState.conf.defaultDataSourceName)
 
-  /**
-   * Specifies the input data source format.
-   *
-   * @since 1.4.0
-   */
-  def format(source: String): DataFrameReader = {
-    this.source = source
-    this
-  }
+  /** @inheritdoc */
+  override def format(source: String): this.type = super.format(source)
 
-  /**
-   * Specifies the input schema. Some data sources (e.g. JSON) can infer the 
input schema
-   * automatically from data. By specifying the schema here, the underlying 
data source can
-   * skip the schema inference step, and thus speed up data loading.
-   *
-   * @since 1.4.0
-   */
-  def schema(schema: StructType): DataFrameReader = {
-    if (schema != null) {
-      val replaced = 
CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
-      this.userSpecifiedSchema = Option(replaced)
-      validateSingleVariantColumn()
-    }
-    this
-  }
+  /** @inheritdoc */
+  override def schema(schema: StructType): this.type = super.schema(schema)
 
-  /**
-   * Specifies the schema by using the input DDL-formatted string. Some data 
sources (e.g. JSON) can
-   * infer the input schema automatically from data. By specifying the schema 
here, the underlying
-   * data source can skip the schema inference step, and thus speed up data 
loading.
-   *
-   * {{{
-   *   spark.read.schema("a INT, b STRING, c DOUBLE").csv("test.csv")
-   * }}}
-   *
-   * @since 2.3.0
-   */
-  def schema(schemaString: String): DataFrameReader = {
-    schema(StructType.fromDDL(schemaString))
-  }
+  /** @inheritdoc */
+  override def schema(schemaString: String): this.type = 
super.schema(schemaString)
 
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names.
-   * If a new option has the same key case-insensitively, it will override the 
existing option.
-   *
-   * @since 1.4.0
-   */
-  def option(key: String, value: String): DataFrameReader = {
-    this.extraOptions = this.extraOptions + (key -> value)
-    validateSingleVariantColumn()
-    this
-  }
+  /** @inheritdoc */
+  override def option(key: String, value: String): this.type = 
super.option(key, value)
 
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names.
-   * If a new option has the same key case-insensitively, it will override the 
existing option.
-   *
-   * @since 2.0.0
-   */
-  def option(key: String, value: Boolean): DataFrameReader = option(key, 
value.toString)
+  /** @inheritdoc */
+  override def option(key: String, value: Boolean): this.type = 
super.option(key, value)
 
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names.
-   * If a new option has the same key case-insensitively, it will override the 
existing option.
-   *
-   * @since 2.0.0
-   */
-  def option(key: String, value: Long): DataFrameReader = option(key, 
value.toString)
+  /** @inheritdoc */
+  override def option(key: String, value: Long): this.type = super.option(key, 
value)
 
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names.
-   * If a new option has the same key case-insensitively, it will override the 
existing option.
-   *
-   * @since 2.0.0
-   */
-  def option(key: String, value: Double): DataFrameReader = option(key, 
value.toString)
+  /** @inheritdoc */
+  override def option(key: String, value: Double): this.type = 
super.option(key, value)
 
-  /**
-   * (Scala-specific) Adds input options for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names.
-   * If a new option has the same key case-insensitively, it will override the 
existing option.
-   *
-   * @since 1.4.0
-   */
-  def options(options: scala.collection.Map[String, String]): DataFrameReader 
= {
-    this.extraOptions ++= options
-    validateSingleVariantColumn()
-    this
-  }
+  /** @inheritdoc */
+  override def options(options: scala.collection.Map[String, String]): 
this.type =
+    super.options(options)
 
-  /**
-   * Adds input options for the underlying data source.
-   *
-   * All options are maintained in a case-insensitive way in terms of key 
names.
-   * If a new option has the same key case-insensitively, it will override the 
existing option.
-   *
-   * @since 1.4.0
-   */
-  def options(options: java.util.Map[String, String]): DataFrameReader = {
-    this.options(options.asScala)
-    this
-  }
+  /** @inheritdoc */
+  override def options(options: java.util.Map[String, String]): this.type = 
super.options(options)
 
-  /**
-   * Loads input in as a `DataFrame`, for data sources that don't require a 
path (e.g. external
-   * key-value stores).
-   *
-   * @since 1.4.0
-   */
-  def load(): DataFrame = {
-    load(Seq.empty: _*) // force invocation of `load(...varargs...)`
-  }
+  /** @inheritdoc */
+  override def load(): DataFrame = load(Nil: _*)
 
-  /**
-   * Loads input in as a `DataFrame`, for data sources that require a path 
(e.g. data backed by
-   * a local or distributed file system).
-   *
-   * @since 1.4.0
-   */
+  /** @inheritdoc */
   def load(path: String): DataFrame = {
     // force invocation of `load(...varargs...)`
     if (sparkSession.sessionState.conf.legacyPathOptionBehavior) {
@@ -193,12 +98,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
     }
   }
 
-  /**
-   * Loads input in as a `DataFrame`, for data sources that support multiple 
paths.
-   * Only works if the source is a HadoopFsRelationProvider.
-   *
-   * @since 1.6.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
   def load(paths: String*): DataFrame = {
     if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
@@ -235,90 +135,22 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
         options = finalOptions.originalMap).resolveRelation())
   }
 
-  /**
-   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
-   * url named table and connection properties.
-   *
-   * You can find the JDBC-specific option and parameter documentation for 
reading tables
-   * via JDBC in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 1.4.0
-   */
-  def jdbc(url: String, table: String, properties: Properties): DataFrame = {
-    assertNoSpecifiedSchema("jdbc")
-    // properties should override settings in extraOptions.
-    this.extraOptions ++= properties.asScala
-    // explicit url and dbtable should override all
-    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url, 
JDBCOptions.JDBC_TABLE_NAME -> table)
-    format("jdbc").load()
-  }
+  /** @inheritdoc */
+  override def jdbc(url: String, table: String, properties: Properties): 
DataFrame =
+    super.jdbc(url, table, properties)
 
-  // scalastyle:off line.size.limit
-  /**
-   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
-   * url named table. Partitions of the table will be retrieved in parallel 
based on the parameters
-   * passed to this function.
-   *
-   * Don't create too many partitions in parallel on a large cluster; 
otherwise Spark might crash
-   * your external database systems.
-   *
-   * You can find the JDBC-specific option and parameter documentation for 
reading tables via JDBC in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @param table Name of the table in the external database.
-   * @param columnName Alias of `partitionColumn` option. Refer to 
`partitionColumn` in
-   *                   <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
-   *                     Data Source Option</a> in the version you use.
-   * @param connectionProperties JDBC database connection arguments, a list of 
arbitrary string
-   *                             tag/value. Normally at least a "user" and 
"password" property
-   *                             should be included. "fetchsize" can be used 
to control the
-   *                             number of rows per fetch and "queryTimeout" 
can be used to wait
-   *                             for a Statement object to execute to the 
given number of seconds.
-   * @since 1.4.0
-   */
-  // scalastyle:on line.size.limit
-  def jdbc(
+  /** @inheritdoc */
+  override def jdbc(
       url: String,
       table: String,
       columnName: String,
       lowerBound: Long,
       upperBound: Long,
       numPartitions: Int,
-      connectionProperties: Properties): DataFrame = {
-    // columnName, lowerBound, upperBound and numPartitions override settings 
in extraOptions.
-    this.extraOptions ++= Map(
-      JDBCOptions.JDBC_PARTITION_COLUMN -> columnName,
-      JDBCOptions.JDBC_LOWER_BOUND -> lowerBound.toString,
-      JDBCOptions.JDBC_UPPER_BOUND -> upperBound.toString,
-      JDBCOptions.JDBC_NUM_PARTITIONS -> numPartitions.toString)
-    jdbc(url, table, connectionProperties)
-  }
+      connectionProperties: Properties): DataFrame =
+    super.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, 
connectionProperties)
 
-  /**
-   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
-   * url named table using connection properties. The `predicates` parameter 
gives a list
-   * expressions suitable for inclusion in WHERE clauses; each one defines one 
partition
-   * of the `DataFrame`.
-   *
-   * Don't create too many partitions in parallel on a large cluster; 
otherwise Spark might crash
-   * your external database systems.
-   *
-   * You can find the JDBC-specific option and parameter documentation for 
reading tables
-   * via JDBC in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @param table Name of the table in the external database.
-   * @param predicates Condition in the where clause for each partition.
-   * @param connectionProperties JDBC database connection arguments, a list of 
arbitrary string
-   *                             tag/value. Normally at least a "user" and 
"password" property
-   *                             should be included. "fetchsize" can be used 
to control the
-   *                             number of rows per fetch.
-   * @since 1.4.0
-   */
+  /** @inheritdoc */
   def jdbc(
       url: String,
       table: String,
@@ -335,38 +167,12 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
     sparkSession.baseRelationToDataFrame(relation)
   }
 
-  /**
-   * Loads a JSON file and returns the results as a `DataFrame`.
-   *
-   * See the documentation on the overloaded `json()` method with varargs for 
more details.
-   *
-   * @since 1.4.0
-   */
-  def json(path: String): DataFrame = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    json(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def json(path: String): DataFrame = super.json(path)
 
-  /**
-   * Loads JSON files and returns the results as a `DataFrame`.
-   *
-   * <a href="http://jsonlines.org/";>JSON Lines</a> (newline-delimited JSON) 
is supported by
-   * default. For JSON (one record per file), set the `multiLine` option to 
true.
-   *
-   * This function goes through the input once to determine the input schema. 
If you know the
-   * schema in advance, use the version that specifies the schema to avoid the 
extra scan.
-   *
-   * You can find the JSON-specific options for reading JSON files in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def json(paths: String*): DataFrame = {
-    userSpecifiedSchema.foreach(checkJsonSchema)
-    format("json").load(paths : _*)
-  }
+  override def json(paths: String*): DataFrame = super.json(paths: _*)
 
   /**
    * Loads a `JavaRDD[String]` storing JSON objects (<a 
href="http://jsonlines.org/";>JSON
@@ -397,16 +203,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
     json(sparkSession.createDataset(jsonRDD)(Encoders.STRING))
   }
 
-  /**
-   * Loads a `Dataset[String]` storing JSON objects (<a 
href="http://jsonlines.org/";>JSON Lines
-   * text format or newline-delimited JSON</a>) and returns the result as a 
`DataFrame`.
-   *
-   * Unless the schema is specified using `schema` function, this function 
goes through the
-   * input once to determine the input schema.
-   *
-   * @param jsonDataset input Dataset with one JSON object per record
-   * @since 2.2.0
-   */
+  /** @inheritdoc */
   def json(jsonDataset: Dataset[String]): DataFrame = {
     val parsedOptions = new JSONOptions(
       extraOptions.toMap,
@@ -439,36 +236,10 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
     sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = 
jsonDataset.isStreaming)
   }
 
-  /**
-   * Loads a CSV file and returns the result as a `DataFrame`. See the 
documentation on the
-   * other overloaded `csv()` method for more details.
-   *
-   * @since 2.0.0
-   */
-  def csv(path: String): DataFrame = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    csv(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def csv(path: String): DataFrame = super.csv(path)
 
-  /**
-   * Loads an `Dataset[String]` storing CSV rows and returns the result as a 
`DataFrame`.
-   *
-   * If the schema is not specified using `schema` function and `inferSchema` 
option is enabled,
-   * this function goes through the input once to determine the input schema.
-   *
-   * If the schema is not specified using `schema` function and `inferSchema` 
option is disabled,
-   * it determines the columns as string types and it reads only the first 
line to determine the
-   * names and the number of fields.
-   *
-   * If the enforceSchema is set to `false`, only the CSV header in the first 
line is checked
-   * to conform specified or inferred schema.
-   *
-   * @note if `header` option is set to `true` when calling this API, all 
lines same with
-   * the header will be removed if exists.
-   *
-   * @param csvDataset input Dataset with one CSV row per record
-   * @since 2.2.0
-   */
+  /** @inheritdoc */
   def csv(csvDataset: Dataset[String]): DataFrame = {
     val parsedOptions: CSVOptions = new CSVOptions(
       extraOptions.toMap,
@@ -527,61 +298,18 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
     sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = 
csvDataset.isStreaming)
   }
 
-  /**
-   * Loads CSV files and returns the result as a `DataFrame`.
-   *
-   * This function will go through the input once to determine the input 
schema if `inferSchema`
-   * is enabled. To avoid going through the entire data once, disable 
`inferSchema` option or
-   * specify the schema explicitly using `schema`.
-   *
-   * You can find the CSV-specific options for reading CSV files in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def csv(paths: String*): DataFrame = format("csv").load(paths : _*)
+  override def csv(paths: String*): DataFrame = super.csv(paths: _*)
 
-  /**
-   * Loads a XML file and returns the result as a `DataFrame`. See the 
documentation on the
-   * other overloaded `xml()` method for more details.
-   *
-   * @since 4.0.0
-   */
-  def xml(path: String): DataFrame = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    xml(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def xml(path: String): DataFrame = super.xml(path)
 
-  /**
-   * Loads XML files and returns the result as a `DataFrame`.
-   *
-   * This function will go through the input once to determine the input 
schema if `inferSchema`
-   * is enabled. To avoid going through the entire data once, disable 
`inferSchema` option or
-   * specify the schema explicitly using `schema`.
-   *
-   * You can find the XML-specific options for reading XML files in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 4.0.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def xml(paths: String*): DataFrame = {
-    userSpecifiedSchema.foreach(checkXmlSchema)
-    format("xml").load(paths: _*)
-  }
+  override def xml(paths: String*): DataFrame = super.xml(paths: _*)
 
-  /**
-   * Loads an `Dataset[String]` storing XML object and returns the result as a 
`DataFrame`.
-   *
-   * If the schema is not specified using `schema` function and `inferSchema` 
option is enabled,
-   * this function goes through the input once to determine the input schema.
-   *
-   * @param xmlDataset input Dataset with one XML object per record
-   * @since 4.0.0
-   */
+  /** @inheritdoc */
   def xml(xmlDataset: Dataset[String]): DataFrame = {
     val parsedOptions: XmlOptions = new XmlOptions(
       extraOptions.toMap,
@@ -614,70 +342,21 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
     sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = 
xmlDataset.isStreaming)
   }
 
-  /**
-   * Loads a Parquet file, returning the result as a `DataFrame`. See the 
documentation
-   * on the other overloaded `parquet()` method for more details.
-   *
-   * @since 2.0.0
-   */
-  def parquet(path: String): DataFrame = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    parquet(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def parquet(path: String): DataFrame = super.parquet(path)
 
-  /**
-   * Loads a Parquet file, returning the result as a `DataFrame`.
-   *
-   * Parquet-specific option(s) for reading Parquet files can be found in
-   * <a href=
-   *   
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 1.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def parquet(paths: String*): DataFrame = {
-    format("parquet").load(paths: _*)
-  }
+  override def parquet(paths: String*): DataFrame = super.parquet(paths: _*)
 
-  /**
-   * Loads an ORC file and returns the result as a `DataFrame`.
-   *
-   * @param path input path
-   * @since 1.5.0
-   */
-  def orc(path: String): DataFrame = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    orc(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def orc(path: String): DataFrame = super.orc(path)
 
-  /**
-   * Loads ORC files and returns the result as a `DataFrame`.
-   *
-   * ORC-specific option(s) for reading ORC files can be found in
-   * <a href=
-   *   
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @param paths input paths
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def orc(paths: String*): DataFrame = format("orc").load(paths: _*)
+  override def orc(paths: String*): DataFrame = super.orc(paths: _*)
 
-  /**
-   * Returns the specified table/view as a `DataFrame`. If it's a table, it 
must support batch
-   * reading and the returned DataFrame is the batch scan query plan of this 
table. If it's a view,
-   * the returned DataFrame is simply the query plan of the view, which can 
either be a batch or
-   * streaming query plan.
-   *
-   * @param tableName is either a qualified or unqualified name that 
designates a table or view.
-   *                  If a database is specified, it identifies the table/view 
from the database.
-   *                  Otherwise, it first attempts to find a temporary view 
with the given name
-   *                  and then match the table/view from the current database.
-   *                  Note that, the global temporary view database is also 
valid here.
-   * @since 1.4.0
-   */
+  /** @inheritdoc */
   def table(tableName: String): DataFrame = {
     assertNoSpecifiedSchema("table")
     val multipartIdentifier =
@@ -686,108 +365,31 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
       new CaseInsensitiveStringMap(extraOptions.toMap.asJava)))
   }
 
-  /**
-   * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
-   * "value", and followed by partitioned columns if there are any. See the 
documentation on
-   * the other overloaded `text()` method for more details.
-   *
-   * @since 2.0.0
-   */
-  def text(path: String): DataFrame = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    text(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def text(path: String): DataFrame = super.text(path)
 
-  /**
-   * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
-   * "value", and followed by partitioned columns if there are any.
-   * The text files must be encoded as UTF-8.
-   *
-   * By default, each line in the text files is a new row in the resulting 
DataFrame. For example:
-   * {{{
-   *   // Scala:
-   *   spark.read.text("/path/to/spark/README.md")
-   *
-   *   // Java:
-   *   spark.read().text("/path/to/spark/README.md")
-   * }}}
-   *
-   * You can find the text-specific options for reading text files in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @param paths input paths
-   * @since 1.6.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def text(paths: String*): DataFrame = format("text").load(paths : _*)
+  override def text(paths: String*): DataFrame = super.text(paths: _*)
 
-  /**
-   * Loads text files and returns a [[Dataset]] of String. See the 
documentation on the
-   * other overloaded `textFile()` method for more details.
-   * @since 2.0.0
-   */
-  def textFile(path: String): Dataset[String] = {
-    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
-    textFile(Seq(path): _*)
-  }
+  /** @inheritdoc */
+  override def textFile(path: String): Dataset[String] = super.textFile(path)
 
-  /**
-   * Loads text files and returns a [[Dataset]] of String. The underlying 
schema of the Dataset
-   * contains a single string column named "value".
-   * The text files must be encoded as UTF-8.
-   *
-   * If the directory structure of the text files contains partitioning 
information, those are
-   * ignored in the resulting Dataset. To include partitioning information as 
columns, use `text`.
-   *
-   * By default, each line in the text files is a new row in the resulting 
DataFrame. For example:
-   * {{{
-   *   // Scala:
-   *   spark.read.textFile("/path/to/spark/README.md")
-   *
-   *   // Java:
-   *   spark.read().textFile("/path/to/spark/README.md")
-   * }}}
-   *
-   * You can set the text-specific options as specified in 
`DataFrameReader.text`.
-   *
-   * @param paths input path
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def textFile(paths: String*): Dataset[String] = {
-    assertNoSpecifiedSchema("textFile")
-    text(paths : 
_*).select("value").as[String](sparkSession.implicits.newStringEncoder)
-  }
+  override def textFile(paths: String*): Dataset[String] = 
super.textFile(paths: _*)
 
-  /**
-   * A convenient function for schema validation in APIs.
-   */
-  private def assertNoSpecifiedSchema(operation: String): Unit = {
-    if (userSpecifiedSchema.nonEmpty) {
-      throw 
QueryCompilationErrors.userSpecifiedSchemaUnsupportedError(operation)
-    }
-  }
-
-  /**
-   * Ensure that the `singleVariantColumn` option cannot be used if there is 
also a user specified
-   * schema.
-   */
-  private def validateSingleVariantColumn(): Unit = {
+  /** @inheritdoc */
+  override protected def validateSingleVariantColumn(): Unit = {
     if (extraOptions.get(JSONOptions.SINGLE_VARIANT_COLUMN).isDefined &&
       userSpecifiedSchema.isDefined) {
       throw QueryCompilationErrors.invalidSingleVariantColumn()
     }
   }
 
-  
///////////////////////////////////////////////////////////////////////////////////////
-  // Builder pattern config options
-  
///////////////////////////////////////////////////////////////////////////////////////
-
-  private var source: String = 
sparkSession.sessionState.conf.defaultDataSourceName
-
-  private var userSpecifiedSchema: Option[StructType] = None
-
-  private var extraOptions = CaseInsensitiveMap[String](Map.empty)
+  override protected def validateJsonSchema(): Unit =
+    userSpecifiedSchema.foreach(checkJsonSchema)
 
+  override protected def validateXmlSchema(): Unit =
+    userSpecifiedSchema.foreach(checkXmlSchema)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 55f67da68221..906e5ede8b4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -649,16 +649,7 @@ class SparkSession private(
     artifactManager.addLocalArtifacts(uri.flatMap(Artifact.parseArtifacts))
   }
 
-  /**
-   * Returns a [[DataFrameReader]] that can be used to read non-streaming data 
in as a
-   * `DataFrame`.
-   * {{{
-   *   sparkSession.read.parquet("/path/to/file.parquet")
-   *   sparkSession.read.schema(schema).json("/path/to/file.json")
-   * }}}
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def read: DataFrameReader = new DataFrameReader(self)
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to