This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a116a5bf708d [SPARK-49416][CONNECT][SQL] Add Shared DataStreamReader 
interface
a116a5bf708d is described below

commit a116a5bf708dbd2e0efc0b1f63f3f655d3e830da
Author: Herman van Hovell <[email protected]>
AuthorDate: Thu Sep 26 08:37:04 2024 -0400

    [SPARK-49416][CONNECT][SQL] Add Shared DataStreamReader interface
    
    ### What changes were proposed in this pull request?
    This PR adds a shared DataStreamReader to sql.
    
    ### Why are the changes needed?
    We are creating a unified Scala interface for sql.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #48213 from hvanhovell/SPARK-49416.
    
    Authored-by: Herman van Hovell <[email protected]>
    Signed-off-by: Herman van Hovell <[email protected]>
---
 .../scala/org/apache/spark/sql/SparkSession.scala  |  10 +-
 .../spark/sql/streaming/DataStreamReader.scala     | 295 ++++---------------
 .../CheckConnectJvmClientCompatibility.scala       |   8 +-
 project/MimaExcludes.scala                         |   1 +
 .../apache/spark/sql/api}/DataStreamReader.scala   | 144 ++++-----
 .../org/apache/spark/sql/api/SparkSession.scala    |  11 +
 .../scala/org/apache/spark/sql/SparkSession.scala  |  10 +-
 .../spark/sql/streaming/DataStreamReader.scala     | 325 ++++-----------------
 8 files changed, 201 insertions(+), 603 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 5313369a2c98..1b41566ca1d1 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -209,15 +209,7 @@ class SparkSession private[sql] (
   /** @inheritdoc */
   def read: DataFrameReader = new DataFrameReader(this)
 
-  /**
-   * Returns a `DataStreamReader` that can be used to read streaming data in 
as a `DataFrame`.
-   * {{{
-   *   sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
-   *   
sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
-   * }}}
-   *
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   def readStream: DataStreamReader = new DataStreamReader(this)
 
   lazy val streams: StreamingQueryManager = new StreamingQueryManager(this)
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 789425c9daea..2ff34a634364 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -21,11 +21,9 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.connect.proto.Read.DataSource
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.Dataset
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
+import org.apache.spark.sql.{api, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.connect.ConnectConversions._
+import org.apache.spark.sql.errors.DataTypeErrors
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -35,101 +33,49 @@ import org.apache.spark.sql.types.StructType
  * @since 3.5.0
  */
 @Evolving
-final class DataStreamReader private[sql] (sparkSession: SparkSession) extends 
Logging {
+final class DataStreamReader private[sql] (sparkSession: SparkSession)
+    extends api.DataStreamReader {
 
-  /**
-   * Specifies the input data source format.
-   *
-   * @since 3.5.0
-   */
-  def format(source: String): DataStreamReader = {
+  private val sourceBuilder = DataSource.newBuilder()
+
+  /** @inheritdoc */
+  def format(source: String): this.type = {
     sourceBuilder.setFormat(source)
     this
   }
 
-  /**
-   * Specifies the input schema. Some data sources (e.g. JSON) can infer the 
input schema
-   * automatically from data. By specifying the schema here, the underlying 
data source can skip
-   * the schema inference step, and thus speed up data loading.
-   *
-   * @since 3.5.0
-   */
-  def schema(schema: StructType): DataStreamReader = {
+  /** @inheritdoc */
+  def schema(schema: StructType): this.type = {
     if (schema != null) {
       sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail 
all the attributes.
     }
     this
   }
 
-  /**
-   * Specifies the schema by using the input DDL-formatted string. Some data 
sources (e.g. JSON)
-   * can infer the input schema automatically from data. By specifying the 
schema here, the
-   * underlying data source can skip the schema inference step, and thus speed 
up data loading.
-   *
-   * @since 3.5.0
-   */
-  def schema(schemaString: String): DataStreamReader = {
+  /** @inheritdoc */
+  override def schema(schemaString: String): this.type = {
     sourceBuilder.setSchema(schemaString)
     this
   }
 
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * @since 3.5.0
-   */
-  def option(key: String, value: String): DataStreamReader = {
+  /** @inheritdoc */
+  def option(key: String, value: String): this.type = {
     sourceBuilder.putOptions(key, value)
     this
   }
 
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * @since 3.5.0
-   */
-  def option(key: String, value: Boolean): DataStreamReader = option(key, 
value.toString)
-
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * @since 3.5.0
-   */
-  def option(key: String, value: Long): DataStreamReader = option(key, 
value.toString)
-
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * @since 3.5.0
-   */
-  def option(key: String, value: Double): DataStreamReader = option(key, 
value.toString)
-
-  /**
-   * (Scala-specific) Adds input options for the underlying data source.
-   *
-   * @since 3.5.0
-   */
-  def options(options: scala.collection.Map[String, String]): DataStreamReader 
= {
+  /** @inheritdoc */
+  def options(options: scala.collection.Map[String, String]): this.type = {
     this.options(options.asJava)
-    this
   }
 
-  /**
-   * (Java-specific) Adds input options for the underlying data source.
-   *
-   * @since 3.5.0
-   */
-  def options(options: java.util.Map[String, String]): DataStreamReader = {
+  /** @inheritdoc */
+  override def options(options: java.util.Map[String, String]): this.type = {
     sourceBuilder.putAllOptions(options)
     this
   }
 
-  /**
-   * Loads input data stream in as a `DataFrame`, for data streams that don't 
require a path (e.g.
-   * external key-value stores).
-   *
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   def load(): DataFrame = {
     sparkSession.newDataFrame { relationBuilder =>
       relationBuilder.getReadBuilder
@@ -138,120 +84,14 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
     }
   }
 
-  /**
-   * Loads input in as a `DataFrame`, for data streams that read from some 
path.
-   *
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   def load(path: String): DataFrame = {
     sourceBuilder.clearPaths()
     sourceBuilder.addPaths(path)
     load()
   }
 
-  /**
-   * Loads a JSON file stream and returns the results as a `DataFrame`.
-   *
-   * <a href="http://jsonlines.org/";>JSON Lines</a> (newline-delimited JSON) 
is supported by
-   * default. For JSON (one record per file), set the `multiLine` option to 
true.
-   *
-   * This function goes through the input once to determine the input schema. 
If you know the
-   * schema in advance, use the version that specifies the schema to avoid the 
extra scan.
-   *
-   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
-   * sets the maximum number of new files to be considered in every 
trigger.</li>
-   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files to
-   * be considered in every trigger.</li> </ul>
-   *
-   * You can find the JSON-specific options for reading JSON file stream in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 3.5.0
-   */
-  def json(path: String): DataFrame = {
-    format("json").load(path)
-  }
-
-  /**
-   * Loads a CSV file stream and returns the result as a `DataFrame`.
-   *
-   * This function will go through the input once to determine the input 
schema if `inferSchema`
-   * is enabled. To avoid going through the entire data once, disable 
`inferSchema` option or
-   * specify the schema explicitly using `schema`.
-   *
-   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
-   * sets the maximum number of new files to be considered in every 
trigger.</li>
-   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files to
-   * be considered in every trigger.</li> </ul>
-   *
-   * You can find the CSV-specific options for reading CSV file stream in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 3.5.0
-   */
-  def csv(path: String): DataFrame = format("csv").load(path)
-
-  /**
-   * Loads a XML file stream and returns the result as a `DataFrame`.
-   *
-   * This function will go through the input once to determine the input 
schema if `inferSchema`
-   * is enabled. To avoid going through the entire data once, disable 
`inferSchema` option or
-   * specify the schema explicitly using `schema`.
-   *
-   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
-   * sets the maximum number of new files to be considered in every 
trigger.</li>
-   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files to
-   * be considered in every trigger.</li> </ul>
-   *
-   * You can find the XML-specific options for reading XML file stream in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 4.0.0
-   */
-  def xml(path: String): DataFrame = format("xml").load(path)
-
-  /**
-   * Loads a ORC file stream, returning the result as a `DataFrame`.
-   *
-   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
-   * sets the maximum number of new files to be considered in every 
trigger.</li>
-   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files to
-   * be considered in every trigger.</li> </ul>
-   *
-   * ORC-specific option(s) for reading ORC file stream can be found in <a 
href=
-   * 
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option";>
 Data
-   * Source Option</a> in the version you use.
-   *
-   * @since 3.5.0
-   */
-  def orc(path: String): DataFrame = format("orc").load(path)
-
-  /**
-   * Loads a Parquet file stream, returning the result as a `DataFrame`.
-   *
-   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
-   * sets the maximum number of new files to be considered in every 
trigger.</li>
-   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files to
-   * be considered in every trigger.</li> </ul>
-   *
-   * Parquet-specific option(s) for reading Parquet file stream can be found 
in <a href=
-   * 
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option";>
 Data
-   * Source Option</a> in the version you use.
-   *
-   * @since 3.5.0
-   */
-  def parquet(path: String): DataFrame = format("parquet").load(path)
-
-  /**
-   * Define a Streaming DataFrame on a Table. The DataSource corresponding to 
the table should
-   * support streaming mode.
-   * @param tableName
-   *   The name of the table
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   def table(tableName: String): DataFrame = {
     require(tableName != null, "The table name can't be null")
     sparkSession.newDataFrame { builder =>
@@ -263,59 +103,44 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
     }
   }
 
-  /**
-   * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
-   * "value", and followed by partitioned columns if there are any. The text 
files must be encoded
-   * as UTF-8.
-   *
-   * By default, each line in the text files is a new row in the resulting 
DataFrame. For example:
-   * {{{
-   *   // Scala:
-   *   spark.readStream.text("/path/to/directory/")
-   *
-   *   // Java:
-   *   spark.readStream().text("/path/to/directory/")
-   * }}}
-   *
-   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
-   * sets the maximum number of new files to be considered in every 
trigger.</li>
-   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files to
-   * be considered in every trigger.</li> </ul>
-   *
-   * You can find the text-specific options for reading text files in <a
-   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 3.5.0
-   */
-  def text(path: String): DataFrame = format("text").load(path)
-
-  /**
-   * Loads text file(s) and returns a `Dataset` of String. The underlying 
schema of the Dataset
-   * contains a single string column named "value". The text files must be 
encoded as UTF-8.
-   *
-   * If the directory structure of the text files contains partitioning 
information, those are
-   * ignored in the resulting Dataset. To include partitioning information as 
columns, use `text`.
-   *
-   * By default, each line in the text file is a new element in the resulting 
Dataset. For
-   * example:
-   * {{{
-   *   // Scala:
-   *   spark.readStream.textFile("/path/to/spark/README.md")
-   *
-   *   // Java:
-   *   spark.readStream().textFile("/path/to/spark/README.md")
-   * }}}
-   *
-   * You can set the text-specific options as specified in 
`DataStreamReader.text`.
-   *
-   * @param path
-   *   input path
-   * @since 3.5.0
-   */
-  def textFile(path: String): Dataset[String] = {
-    text(path).select("value").as[String](StringEncoder)
+  override protected def assertNoSpecifiedSchema(operation: String): Unit = {
+    if (sourceBuilder.hasSchema) {
+      throw DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation)
+    }
   }
 
-  private val sourceBuilder = DataSource.newBuilder()
+  
///////////////////////////////////////////////////////////////////////////////////////
+  // Covariant overrides.
+  
///////////////////////////////////////////////////////////////////////////////////////
+
+  /** @inheritdoc */
+  override def option(key: String, value: Boolean): this.type = 
super.option(key, value)
+
+  /** @inheritdoc */
+  override def option(key: String, value: Long): this.type = super.option(key, 
value)
+
+  /** @inheritdoc */
+  override def option(key: String, value: Double): this.type = 
super.option(key, value)
+
+  /** @inheritdoc */
+  override def json(path: String): DataFrame = super.json(path)
+
+  /** @inheritdoc */
+  override def csv(path: String): DataFrame = super.csv(path)
+
+  /** @inheritdoc */
+  override def xml(path: String): DataFrame = super.xml(path)
+
+  /** @inheritdoc */
+  override def orc(path: String): DataFrame = super.orc(path)
+
+  /** @inheritdoc */
+  override def parquet(path: String): DataFrame = super.parquet(path)
+
+  /** @inheritdoc */
+  override def text(path: String): DataFrame = super.text(path)
+
+  /** @inheritdoc */
+  override def textFile(path: String): Dataset[String] = super.textFile(path)
+
 }
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 16f6983efb18..c8776af18a14 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -304,7 +304,13 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[DirectMissingMethodProblem](
         "org.apache.spark.sql.DataFrameReader.validateJsonSchema"),
       ProblemFilters.exclude[DirectMissingMethodProblem](
-        "org.apache.spark.sql.DataFrameReader.validateXmlSchema"))
+        "org.apache.spark.sql.DataFrameReader.validateXmlSchema"),
+
+      // Protected DataStreamReader methods...
+      ProblemFilters.exclude[DirectMissingMethodProblem](
+        "org.apache.spark.sql.streaming.DataStreamReader.validateJsonSchema"),
+      ProblemFilters.exclude[DirectMissingMethodProblem](
+        "org.apache.spark.sql.streaming.DataStreamReader.validateXmlSchema"))
 
     checkMiMaCompatibility(clientJar, sqlJar, includedRules, excludeRules)
   }
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 9a89ebb4797c..0bd0121e6e14 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -179,6 +179,7 @@ object MimaExcludes {
     // SPARK-49282: Shared SparkSessionBuilder
     
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.SparkSession$Builder"),
   ) ++ loggingExcludes("org.apache.spark.sql.DataFrameReader") ++
+    loggingExcludes("org.apache.spark.sql.streaming.DataStreamReader") ++
     loggingExcludes("org.apache.spark.sql.SparkSession#Builder")
 
   // Default exclude rules
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
 b/sql/api/src/main/scala/org/apache/spark/sql/api/DataStreamReader.scala
similarity index 75%
copy from 
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
copy to sql/api/src/main/scala/org/apache/spark/sql/api/DataStreamReader.scala
index 789425c9daea..219ecb77d403 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/DataStreamReader.scala
@@ -14,113 +14,94 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.spark.sql.streaming
+package org.apache.spark.sql.api
 
 import scala.jdk.CollectionConverters._
 
+import _root_.java
+
 import org.apache.spark.annotation.Evolving
-import org.apache.spark.connect.proto.Read.DataSource
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.Dataset
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
+import org.apache.spark.sql.{Encoders, Row}
 import org.apache.spark.sql.types.StructType
 
 /**
  * Interface used to load a streaming `Dataset` from external storage systems 
(e.g. file systems,
  * key-value stores, etc). Use `SparkSession.readStream` to access this.
  *
- * @since 3.5.0
+ * @since 2.0.0
  */
 @Evolving
-final class DataStreamReader private[sql] (sparkSession: SparkSession) extends 
Logging {
+abstract class DataStreamReader {
 
   /**
    * Specifies the input data source format.
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def format(source: String): DataStreamReader = {
-    sourceBuilder.setFormat(source)
-    this
-  }
+  def format(source: String): this.type
 
   /**
    * Specifies the input schema. Some data sources (e.g. JSON) can infer the 
input schema
    * automatically from data. By specifying the schema here, the underlying 
data source can skip
    * the schema inference step, and thus speed up data loading.
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def schema(schema: StructType): DataStreamReader = {
-    if (schema != null) {
-      sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail 
all the attributes.
-    }
-    this
-  }
+  def schema(schema: StructType): this.type
 
   /**
    * Specifies the schema by using the input DDL-formatted string. Some data 
sources (e.g. JSON)
    * can infer the input schema automatically from data. By specifying the 
schema here, the
    * underlying data source can skip the schema inference step, and thus speed 
up data loading.
    *
-   * @since 3.5.0
+   * @since 2.3.0
    */
-  def schema(schemaString: String): DataStreamReader = {
-    sourceBuilder.setSchema(schemaString)
-    this
+  def schema(schemaString: String): this.type = {
+    schema(StructType.fromDDL(schemaString))
   }
 
   /**
    * Adds an input option for the underlying data source.
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def option(key: String, value: String): DataStreamReader = {
-    sourceBuilder.putOptions(key, value)
-    this
-  }
+  def option(key: String, value: String): this.type
 
   /**
    * Adds an input option for the underlying data source.
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def option(key: String, value: Boolean): DataStreamReader = option(key, 
value.toString)
+  def option(key: String, value: Boolean): this.type = option(key, 
value.toString)
 
   /**
    * Adds an input option for the underlying data source.
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def option(key: String, value: Long): DataStreamReader = option(key, 
value.toString)
+  def option(key: String, value: Long): this.type = option(key, value.toString)
 
   /**
    * Adds an input option for the underlying data source.
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def option(key: String, value: Double): DataStreamReader = option(key, 
value.toString)
+  def option(key: String, value: Double): this.type = option(key, 
value.toString)
 
   /**
    * (Scala-specific) Adds input options for the underlying data source.
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def options(options: scala.collection.Map[String, String]): DataStreamReader 
= {
-    this.options(options.asJava)
-    this
-  }
+  def options(options: scala.collection.Map[String, String]): this.type
 
   /**
    * (Java-specific) Adds input options for the underlying data source.
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def options(options: java.util.Map[String, String]): DataStreamReader = {
-    sourceBuilder.putAllOptions(options)
+  def options(options: java.util.Map[String, String]): this.type = {
+    this.options(options.asScala)
     this
   }
 
@@ -128,26 +109,16 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    * Loads input data stream in as a `DataFrame`, for data streams that don't 
require a path (e.g.
    * external key-value stores).
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def load(): DataFrame = {
-    sparkSession.newDataFrame { relationBuilder =>
-      relationBuilder.getReadBuilder
-        .setIsStreaming(true)
-        .setDataSource(sourceBuilder.build())
-    }
-  }
+  def load(): Dataset[Row]
 
   /**
    * Loads input in as a `DataFrame`, for data streams that read from some 
path.
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def load(path: String): DataFrame = {
-    sourceBuilder.clearPaths()
-    sourceBuilder.addPaths(path)
-    load()
-  }
+  def load(path: String): Dataset[Row]
 
   /**
    * Loads a JSON file stream and returns the results as a `DataFrame`.
@@ -167,9 +138,10 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    * 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
    * Data Source Option</a> in the version you use.
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def json(path: String): DataFrame = {
+  def json(path: String): Dataset[Row] = {
+    validateJsonSchema()
     format("json").load(path)
   }
 
@@ -189,9 +161,9 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    * 
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option";>
    * Data Source Option</a> in the version you use.
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def csv(path: String): DataFrame = format("csv").load(path)
+  def csv(path: String): Dataset[Row] = format("csv").load(path)
 
   /**
    * Loads a XML file stream and returns the result as a `DataFrame`.
@@ -211,7 +183,10 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    *
    * @since 4.0.0
    */
-  def xml(path: String): DataFrame = format("xml").load(path)
+  def xml(path: String): Dataset[Row] = {
+    validateXmlSchema()
+    format("xml").load(path)
+  }
 
   /**
    * Loads a ORC file stream, returning the result as a `DataFrame`.
@@ -225,9 +200,11 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    * 
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option";>
 Data
    * Source Option</a> in the version you use.
    *
-   * @since 3.5.0
+   * @since 2.3.0
    */
-  def orc(path: String): DataFrame = format("orc").load(path)
+  def orc(path: String): Dataset[Row] = {
+    format("orc").load(path)
+  }
 
   /**
    * Loads a Parquet file stream, returning the result as a `DataFrame`.
@@ -241,27 +218,20 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    * 
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option";>
 Data
    * Source Option</a> in the version you use.
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def parquet(path: String): DataFrame = format("parquet").load(path)
+  def parquet(path: String): Dataset[Row] = {
+    format("parquet").load(path)
+  }
 
   /**
    * Define a Streaming DataFrame on a Table. The DataSource corresponding to 
the table should
    * support streaming mode.
    * @param tableName
    *   The name of the table
-   * @since 3.5.0
+   * @since 3.1.0
    */
-  def table(tableName: String): DataFrame = {
-    require(tableName != null, "The table name can't be null")
-    sparkSession.newDataFrame { builder =>
-      builder.getReadBuilder
-        .setIsStreaming(true)
-        .getNamedTableBuilder
-        .setUnparsedIdentifier(tableName)
-        .putAllOptions(sourceBuilder.getOptionsMap)
-    }
-  }
+  def table(tableName: String): Dataset[Row]
 
   /**
    * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
@@ -286,9 +256,9 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    * 
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option";>
    * Data Source Option</a> in the version you use.
    *
-   * @since 3.5.0
+   * @since 2.0.0
    */
-  def text(path: String): DataFrame = format("text").load(path)
+  def text(path: String): Dataset[Row] = format("text").load(path)
 
   /**
    * Loads text file(s) and returns a `Dataset` of String. The underlying 
schema of the Dataset
@@ -311,11 +281,17 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    *
    * @param path
    *   input path
-   * @since 3.5.0
+   * @since 2.1.0
    */
   def textFile(path: String): Dataset[String] = {
-    text(path).select("value").as[String](StringEncoder)
+    assertNoSpecifiedSchema("textFile")
+    text(path).select("value").as(Encoders.STRING)
   }
 
-  private val sourceBuilder = DataSource.newBuilder()
+  protected def assertNoSpecifiedSchema(operation: String): Unit
+
+  protected def validateJsonSchema(): Unit = ()
+
+  protected def validateXmlSchema(): Unit = ()
+
 }
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
index 2295c153cd51..0f73a94c3c4a 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -506,6 +506,17 @@ abstract class SparkSession extends Serializable with 
Closeable {
    */
   def read: DataFrameReader
 
+  /**
+   * Returns a `DataStreamReader` that can be used to read streaming data in 
as a `DataFrame`.
+   * {{{
+   *   sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
+   *   
sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  def readStream: DataStreamReader
+
   /**
    * (Scala-specific) Implicit methods available in Scala for converting 
common Scala objects into
    * `DataFrame`s.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index fe139d629eb2..983cc24718fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -739,15 +739,7 @@ class SparkSession private(
   /** @inheritdoc */
   def read: DataFrameReader = new DataFrameReader(self)
 
-  /**
-   * Returns a `DataStreamReader` that can be used to read streaming data in 
as a `DataFrame`.
-   * {{{
-   *   sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
-   *   
sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
-   * }}}
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def readStream: DataStreamReader = new DataStreamReader(self)
 
   // scalastyle:off
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 24d769fc8fc8..f42d8b667ab1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -22,12 +22,12 @@ import java.util.Locale
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.annotation.Evolving
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.{api, DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils}
+import org.apache.spark.sql.classic.ClassicConversions._
 import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider}
 import org.apache.spark.sql.connector.catalog.TableCapability._
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -49,25 +49,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
  * @since 2.0.0
  */
 @Evolving
-final class DataStreamReader private[sql](sparkSession: SparkSession) extends 
Logging {
-  /**
-   * Specifies the input data source format.
-   *
-   * @since 2.0.0
-   */
-  def format(source: String): DataStreamReader = {
+final class DataStreamReader private[sql](sparkSession: SparkSession) extends 
api.DataStreamReader {
+  /** @inheritdoc */
+  def format(source: String): this.type = {
     this.source = source
     this
   }
 
-  /**
-   * Specifies the input schema. Some data sources (e.g. JSON) can infer the 
input schema
-   * automatically from data. By specifying the schema here, the underlying 
data source can
-   * skip the schema inference step, and thus speed up data loading.
-   *
-   * @since 2.0.0
-   */
-  def schema(schema: StructType): DataStreamReader = {
+  /** @inheritdoc */
+  def schema(schema: StructType): this.type = {
     if (schema != null) {
       val replaced = 
CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
       this.userSpecifiedSchema = Option(replaced)
@@ -75,75 +65,19 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
     this
   }
 
-  /**
-   * Specifies the schema by using the input DDL-formatted string. Some data 
sources (e.g. JSON) can
-   * infer the input schema automatically from data. By specifying the schema 
here, the underlying
-   * data source can skip the schema inference step, and thus speed up data 
loading.
-   *
-   * @since 2.3.0
-   */
-  def schema(schemaString: String): DataStreamReader = {
-    schema(StructType.fromDDL(schemaString))
-  }
-
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * @since 2.0.0
-   */
-  def option(key: String, value: String): DataStreamReader = {
+  /** @inheritdoc */
+  def option(key: String, value: String): this.type = {
     this.extraOptions += (key -> value)
     this
   }
 
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * @since 2.0.0
-   */
-  def option(key: String, value: Boolean): DataStreamReader = option(key, 
value.toString)
-
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * @since 2.0.0
-   */
-  def option(key: String, value: Long): DataStreamReader = option(key, 
value.toString)
-
-  /**
-   * Adds an input option for the underlying data source.
-   *
-   * @since 2.0.0
-   */
-  def option(key: String, value: Double): DataStreamReader = option(key, 
value.toString)
-
-  /**
-   * (Scala-specific) Adds input options for the underlying data source.
-   *
-   * @since 2.0.0
-   */
-  def options(options: scala.collection.Map[String, String]): DataStreamReader 
= {
+  /** @inheritdoc */
+  def options(options: scala.collection.Map[String, String]): this.type = {
     this.extraOptions ++= options
     this
   }
 
-  /**
-   * (Java-specific) Adds input options for the underlying data source.
-   *
-   * @since 2.0.0
-   */
-  def options(options: java.util.Map[String, String]): DataStreamReader = {
-    this.options(options.asScala)
-    this
-  }
-
-
-  /**
-   * Loads input data stream in as a `DataFrame`, for data streams that don't 
require a path
-   * (e.g. external key-value stores).
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def load(): DataFrame = loadInternal(None)
 
   private def loadInternal(path: Option[String]): DataFrame = {
@@ -205,11 +139,7 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
     }
   }
 
-  /**
-   * Loads input in as a `DataFrame`, for data streams that read from some 
path.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def load(path: String): DataFrame = {
     if (!sparkSession.sessionState.conf.legacyPathOptionBehavior &&
         extraOptions.contains("path")) {
@@ -218,133 +148,7 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
     loadInternal(Some(path))
   }
 
-  /**
-   * Loads a JSON file stream and returns the results as a `DataFrame`.
-   *
-   * <a href="http://jsonlines.org/";>JSON Lines</a> (newline-delimited JSON) 
is supported by
-   * default. For JSON (one record per file), set the `multiLine` option to 
true.
-   *
-   * This function goes through the input once to determine the input schema. 
If you know the
-   * schema in advance, use the version that specifies the schema to avoid the 
extra scan.
-   *
-   * You can set the following option(s):
-   * <ul>
-   * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
-   * considered in every trigger.</li>
-   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files
-   * to be considered in every trigger.</li>
-   * </ul>
-   *
-   * You can find the JSON-specific options for reading JSON file stream in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 2.0.0
-   */
-  def json(path: String): DataFrame = {
-    userSpecifiedSchema.foreach(checkJsonSchema)
-    format("json").load(path)
-  }
-
-  /**
-   * Loads a CSV file stream and returns the result as a `DataFrame`.
-   *
-   * This function will go through the input once to determine the input 
schema if `inferSchema`
-   * is enabled. To avoid going through the entire data once, disable 
`inferSchema` option or
-   * specify the schema explicitly using `schema`.
-   *
-   * You can set the following option(s):
-   * <ul>
-   * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
-   * considered in every trigger.</li>
-   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files
-   * to be considered in every trigger.</li>
-   * </ul>
-   *
-   * You can find the CSV-specific options for reading CSV file stream in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 2.0.0
-   */
-  def csv(path: String): DataFrame = format("csv").load(path)
-
-  /**
-   * Loads a XML file stream and returns the result as a `DataFrame`.
-   *
-   * This function will go through the input once to determine the input 
schema if `inferSchema`
-   * is enabled. To avoid going through the entire data once, disable 
`inferSchema` option or
-   * specify the schema explicitly using `schema`.
-   *
-   * You can set the following option(s):
-   * <ul>
-   * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
-   * considered in every trigger.</li>
-   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files
-   * to be considered in every trigger.</li>
-   * </ul>
-   *
-   * You can find the XML-specific options for reading XML file stream in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
-   * Data Source Option</a> in the version you use.
-   *
-   * @since 4.0.0
-   */
-  def xml(path: String): DataFrame = {
-    userSpecifiedSchema.foreach(checkXmlSchema)
-    format("xml").load(path)
-  }
-
-  /**
-   * Loads a ORC file stream, returning the result as a `DataFrame`.
-   *
-   * You can set the following option(s):
-   * <ul>
-   * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
-   * considered in every trigger.</li>
-   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files
-   * to be considered in every trigger.</li>
-   * </ul>
-   *
-   * ORC-specific option(s) for reading ORC file stream can be found in
-   * <a href=
-   *   
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 2.3.0
-   */
-  def orc(path: String): DataFrame = {
-    format("orc").load(path)
-  }
-
-  /**
-   * Loads a Parquet file stream, returning the result as a `DataFrame`.
-   *
-   * You can set the following option(s):
-   * <ul>
-   * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
-   * considered in every trigger.</li>
-   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files
-   * to be considered in every trigger.</li>
-   * </ul>
-   *
-   * Parquet-specific option(s) for reading Parquet file stream can be found in
-   * <a href=
-   *   
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 2.0.0
-   */
-  def parquet(path: String): DataFrame = {
-    format("parquet").load(path)
-  }
-
-  /**
-   * Define a Streaming DataFrame on a Table. The DataSource corresponding to 
the table should
-   * support streaming mode.
-   * @param tableName The name of the table
-   * @since 3.1.0
-   */
+  /** @inheritdoc */
   def table(tableName: String): DataFrame = {
     require(tableName != null, "The table name can't be null")
     val identifier = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
@@ -356,65 +160,56 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
         isStreaming = true))
   }
 
-  /**
-   * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
-   * "value", and followed by partitioned columns if there are any.
-   * The text files must be encoded as UTF-8.
-   *
-   * By default, each line in the text files is a new row in the resulting 
DataFrame. For example:
-   * {{{
-   *   // Scala:
-   *   spark.readStream.text("/path/to/directory/")
-   *
-   *   // Java:
-   *   spark.readStream().text("/path/to/directory/")
-   * }}}
-   *
-   * You can set the following option(s):
-   * <ul>
-   * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
-   * considered in every trigger.</li>
-   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files
-   * to be considered in every trigger.</li>
-   * </ul>
-   *
-   * You can find the text-specific options for reading text files in
-   * <a 
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option";>
-   *   Data Source Option</a> in the version you use.
-   *
-   * @since 2.0.0
-   */
-  def text(path: String): DataFrame = format("text").load(path)
-
-  /**
-   * Loads text file(s) and returns a `Dataset` of String. The underlying 
schema of the Dataset
-   * contains a single string column named "value".
-   * The text files must be encoded as UTF-8.
-   *
-   * If the directory structure of the text files contains partitioning 
information, those are
-   * ignored in the resulting Dataset. To include partitioning information as 
columns, use `text`.
-   *
-   * By default, each line in the text file is a new element in the resulting 
Dataset. For example:
-   * {{{
-   *   // Scala:
-   *   spark.readStream.textFile("/path/to/spark/README.md")
-   *
-   *   // Java:
-   *   spark.readStream().textFile("/path/to/spark/README.md")
-   * }}}
-   *
-   * You can set the text-specific options as specified in 
`DataStreamReader.text`.
-   *
-   * @param path input path
-   * @since 2.1.0
-   */
-  def textFile(path: String): Dataset[String] = {
+  override protected def assertNoSpecifiedSchema(operation: String): Unit = {
     if (userSpecifiedSchema.nonEmpty) {
-      throw 
QueryCompilationErrors.userSpecifiedSchemaUnsupportedError("textFile")
+      throw 
QueryCompilationErrors.userSpecifiedSchemaUnsupportedError(operation)
     }
-    
text(path).select("value").as[String](sparkSession.implicits.newStringEncoder)
   }
 
+  override protected def validateJsonSchema(): Unit = 
userSpecifiedSchema.foreach(checkJsonSchema)
+
+  override protected def validateXmlSchema(): Unit = 
userSpecifiedSchema.foreach(checkXmlSchema)
+
+  
///////////////////////////////////////////////////////////////////////////////////////
+  // Covariant overrides.
+  
///////////////////////////////////////////////////////////////////////////////////////
+
+  /** @inheritdoc */
+  override def schema(schemaString: String): this.type = 
super.schema(schemaString)
+
+  /** @inheritdoc */
+  override def option(key: String, value: Boolean): this.type = 
super.option(key, value)
+
+  /** @inheritdoc */
+  override def option(key: String, value: Long): this.type = super.option(key, 
value)
+
+  /** @inheritdoc */
+  override def option(key: String, value: Double): this.type = 
super.option(key, value)
+
+  /** @inheritdoc */
+  override def options(options: java.util.Map[String, String]): this.type = 
super.options(options)
+
+  /** @inheritdoc */
+  override def json(path: String): DataFrame = super.json(path)
+
+  /** @inheritdoc */
+  override def csv(path: String): DataFrame = super.csv(path)
+
+  /** @inheritdoc */
+  override def xml(path: String): DataFrame = super.xml(path)
+
+  /** @inheritdoc */
+  override def orc(path: String): DataFrame = super.orc(path)
+
+  /** @inheritdoc */
+  override def parquet(path: String): DataFrame = super.parquet(path)
+
+  /** @inheritdoc */
+  override def text(path: String): DataFrame = super.text(path)
+
+  /** @inheritdoc */
+  override def textFile(path: String): Dataset[String] = super.textFile(path)
+
   
///////////////////////////////////////////////////////////////////////////////////////
   // Builder pattern config options
   
///////////////////////////////////////////////////////////////////////////////////////


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to