This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a116a5bf708d [SPARK-49416][CONNECT][SQL] Add Shared DataStreamReader
interface
a116a5bf708d is described below
commit a116a5bf708dbd2e0efc0b1f63f3f655d3e830da
Author: Herman van Hovell <[email protected]>
AuthorDate: Thu Sep 26 08:37:04 2024 -0400
[SPARK-49416][CONNECT][SQL] Add Shared DataStreamReader interface
### What changes were proposed in this pull request?
This PR adds a shared DataStreamReader to sql.
### Why are the changes needed?
We are creating a unified Scala interface for sql.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48213 from hvanhovell/SPARK-49416.
Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../scala/org/apache/spark/sql/SparkSession.scala | 10 +-
.../spark/sql/streaming/DataStreamReader.scala | 295 ++++---------------
.../CheckConnectJvmClientCompatibility.scala | 8 +-
project/MimaExcludes.scala | 1 +
.../apache/spark/sql/api}/DataStreamReader.scala | 144 ++++-----
.../org/apache/spark/sql/api/SparkSession.scala | 11 +
.../scala/org/apache/spark/sql/SparkSession.scala | 10 +-
.../spark/sql/streaming/DataStreamReader.scala | 325 ++++-----------------
8 files changed, 201 insertions(+), 603 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 5313369a2c98..1b41566ca1d1 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -209,15 +209,7 @@ class SparkSession private[sql] (
/** @inheritdoc */
def read: DataFrameReader = new DataFrameReader(this)
- /**
- * Returns a `DataStreamReader` that can be used to read streaming data in
as a `DataFrame`.
- * {{{
- * sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
- *
sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
- * }}}
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
def readStream: DataStreamReader = new DataStreamReader(this)
lazy val streams: StreamingQueryManager = new StreamingQueryManager(this)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 789425c9daea..2ff34a634364 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -21,11 +21,9 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.annotation.Evolving
import org.apache.spark.connect.proto.Read.DataSource
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.Dataset
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
+import org.apache.spark.sql.{api, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.connect.ConnectConversions._
+import org.apache.spark.sql.errors.DataTypeErrors
import org.apache.spark.sql.types.StructType
/**
@@ -35,101 +33,49 @@ import org.apache.spark.sql.types.StructType
* @since 3.5.0
*/
@Evolving
-final class DataStreamReader private[sql] (sparkSession: SparkSession) extends
Logging {
+final class DataStreamReader private[sql] (sparkSession: SparkSession)
+ extends api.DataStreamReader {
- /**
- * Specifies the input data source format.
- *
- * @since 3.5.0
- */
- def format(source: String): DataStreamReader = {
+ private val sourceBuilder = DataSource.newBuilder()
+
+ /** @inheritdoc */
+ def format(source: String): this.type = {
sourceBuilder.setFormat(source)
this
}
- /**
- * Specifies the input schema. Some data sources (e.g. JSON) can infer the
input schema
- * automatically from data. By specifying the schema here, the underlying
data source can skip
- * the schema inference step, and thus speed up data loading.
- *
- * @since 3.5.0
- */
- def schema(schema: StructType): DataStreamReader = {
+ /** @inheritdoc */
+ def schema(schema: StructType): this.type = {
if (schema != null) {
sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail
all the attributes.
}
this
}
- /**
- * Specifies the schema by using the input DDL-formatted string. Some data
sources (e.g. JSON)
- * can infer the input schema automatically from data. By specifying the
schema here, the
- * underlying data source can skip the schema inference step, and thus speed
up data loading.
- *
- * @since 3.5.0
- */
- def schema(schemaString: String): DataStreamReader = {
+ /** @inheritdoc */
+ override def schema(schemaString: String): this.type = {
sourceBuilder.setSchema(schemaString)
this
}
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 3.5.0
- */
- def option(key: String, value: String): DataStreamReader = {
+ /** @inheritdoc */
+ def option(key: String, value: String): this.type = {
sourceBuilder.putOptions(key, value)
this
}
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 3.5.0
- */
- def option(key: String, value: Boolean): DataStreamReader = option(key,
value.toString)
-
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 3.5.0
- */
- def option(key: String, value: Long): DataStreamReader = option(key,
value.toString)
-
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 3.5.0
- */
- def option(key: String, value: Double): DataStreamReader = option(key,
value.toString)
-
- /**
- * (Scala-specific) Adds input options for the underlying data source.
- *
- * @since 3.5.0
- */
- def options(options: scala.collection.Map[String, String]): DataStreamReader
= {
+ /** @inheritdoc */
+ def options(options: scala.collection.Map[String, String]): this.type = {
this.options(options.asJava)
- this
}
- /**
- * (Java-specific) Adds input options for the underlying data source.
- *
- * @since 3.5.0
- */
- def options(options: java.util.Map[String, String]): DataStreamReader = {
+ /** @inheritdoc */
+ override def options(options: java.util.Map[String, String]): this.type = {
sourceBuilder.putAllOptions(options)
this
}
- /**
- * Loads input data stream in as a `DataFrame`, for data streams that don't
require a path (e.g.
- * external key-value stores).
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
def load(): DataFrame = {
sparkSession.newDataFrame { relationBuilder =>
relationBuilder.getReadBuilder
@@ -138,120 +84,14 @@ final class DataStreamReader private[sql] (sparkSession:
SparkSession) extends L
}
}
- /**
- * Loads input in as a `DataFrame`, for data streams that read from some
path.
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
def load(path: String): DataFrame = {
sourceBuilder.clearPaths()
sourceBuilder.addPaths(path)
load()
}
- /**
- * Loads a JSON file stream and returns the results as a `DataFrame`.
- *
- * <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON)
is supported by
- * default. For JSON (one record per file), set the `multiLine` option to
true.
- *
- * This function goes through the input once to determine the input schema.
If you know the
- * schema in advance, use the version that specifies the schema to avoid the
extra scan.
- *
- * You can set the following option(s): <ul> <li>`maxFilesPerTrigger`
(default: no max limit):
- * sets the maximum number of new files to be considered in every
trigger.</li>
- * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total
size of new files to
- * be considered in every trigger.</li> </ul>
- *
- * You can find the JSON-specific options for reading JSON file stream in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 3.5.0
- */
- def json(path: String): DataFrame = {
- format("json").load(path)
- }
-
- /**
- * Loads a CSV file stream and returns the result as a `DataFrame`.
- *
- * This function will go through the input once to determine the input
schema if `inferSchema`
- * is enabled. To avoid going through the entire data once, disable
`inferSchema` option or
- * specify the schema explicitly using `schema`.
- *
- * You can set the following option(s): <ul> <li>`maxFilesPerTrigger`
(default: no max limit):
- * sets the maximum number of new files to be considered in every
trigger.</li>
- * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total
size of new files to
- * be considered in every trigger.</li> </ul>
- *
- * You can find the CSV-specific options for reading CSV file stream in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 3.5.0
- */
- def csv(path: String): DataFrame = format("csv").load(path)
-
- /**
- * Loads a XML file stream and returns the result as a `DataFrame`.
- *
- * This function will go through the input once to determine the input
schema if `inferSchema`
- * is enabled. To avoid going through the entire data once, disable
`inferSchema` option or
- * specify the schema explicitly using `schema`.
- *
- * You can set the following option(s): <ul> <li>`maxFilesPerTrigger`
(default: no max limit):
- * sets the maximum number of new files to be considered in every
trigger.</li>
- * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total
size of new files to
- * be considered in every trigger.</li> </ul>
- *
- * You can find the XML-specific options for reading XML file stream in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 4.0.0
- */
- def xml(path: String): DataFrame = format("xml").load(path)
-
- /**
- * Loads a ORC file stream, returning the result as a `DataFrame`.
- *
- * You can set the following option(s): <ul> <li>`maxFilesPerTrigger`
(default: no max limit):
- * sets the maximum number of new files to be considered in every
trigger.</li>
- * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total
size of new files to
- * be considered in every trigger.</li> </ul>
- *
- * ORC-specific option(s) for reading ORC file stream can be found in <a
href=
- *
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option">
Data
- * Source Option</a> in the version you use.
- *
- * @since 3.5.0
- */
- def orc(path: String): DataFrame = format("orc").load(path)
-
- /**
- * Loads a Parquet file stream, returning the result as a `DataFrame`.
- *
- * You can set the following option(s): <ul> <li>`maxFilesPerTrigger`
(default: no max limit):
- * sets the maximum number of new files to be considered in every
trigger.</li>
- * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total
size of new files to
- * be considered in every trigger.</li> </ul>
- *
- * Parquet-specific option(s) for reading Parquet file stream can be found
in <a href=
- *
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option">
Data
- * Source Option</a> in the version you use.
- *
- * @since 3.5.0
- */
- def parquet(path: String): DataFrame = format("parquet").load(path)
-
- /**
- * Define a Streaming DataFrame on a Table. The DataSource corresponding to
the table should
- * support streaming mode.
- * @param tableName
- * The name of the table
- * @since 3.5.0
- */
+ /** @inheritdoc */
def table(tableName: String): DataFrame = {
require(tableName != null, "The table name can't be null")
sparkSession.newDataFrame { builder =>
@@ -263,59 +103,44 @@ final class DataStreamReader private[sql] (sparkSession:
SparkSession) extends L
}
}
- /**
- * Loads text files and returns a `DataFrame` whose schema starts with a
string column named
- * "value", and followed by partitioned columns if there are any. The text
files must be encoded
- * as UTF-8.
- *
- * By default, each line in the text files is a new row in the resulting
DataFrame. For example:
- * {{{
- * // Scala:
- * spark.readStream.text("/path/to/directory/")
- *
- * // Java:
- * spark.readStream().text("/path/to/directory/")
- * }}}
- *
- * You can set the following option(s): <ul> <li>`maxFilesPerTrigger`
(default: no max limit):
- * sets the maximum number of new files to be considered in every
trigger.</li>
- * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total
size of new files to
- * be considered in every trigger.</li> </ul>
- *
- * You can find the text-specific options for reading text files in <a
- *
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 3.5.0
- */
- def text(path: String): DataFrame = format("text").load(path)
-
- /**
- * Loads text file(s) and returns a `Dataset` of String. The underlying
schema of the Dataset
- * contains a single string column named "value". The text files must be
encoded as UTF-8.
- *
- * If the directory structure of the text files contains partitioning
information, those are
- * ignored in the resulting Dataset. To include partitioning information as
columns, use `text`.
- *
- * By default, each line in the text file is a new element in the resulting
Dataset. For
- * example:
- * {{{
- * // Scala:
- * spark.readStream.textFile("/path/to/spark/README.md")
- *
- * // Java:
- * spark.readStream().textFile("/path/to/spark/README.md")
- * }}}
- *
- * You can set the text-specific options as specified in
`DataStreamReader.text`.
- *
- * @param path
- * input path
- * @since 3.5.0
- */
- def textFile(path: String): Dataset[String] = {
- text(path).select("value").as[String](StringEncoder)
+ override protected def assertNoSpecifiedSchema(operation: String): Unit = {
+ if (sourceBuilder.hasSchema) {
+ throw DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation)
+ }
}
- private val sourceBuilder = DataSource.newBuilder()
+
///////////////////////////////////////////////////////////////////////////////////////
+ // Covariant overrides.
+
///////////////////////////////////////////////////////////////////////////////////////
+
+ /** @inheritdoc */
+ override def option(key: String, value: Boolean): this.type =
super.option(key, value)
+
+ /** @inheritdoc */
+ override def option(key: String, value: Long): this.type = super.option(key,
value)
+
+ /** @inheritdoc */
+ override def option(key: String, value: Double): this.type =
super.option(key, value)
+
+ /** @inheritdoc */
+ override def json(path: String): DataFrame = super.json(path)
+
+ /** @inheritdoc */
+ override def csv(path: String): DataFrame = super.csv(path)
+
+ /** @inheritdoc */
+ override def xml(path: String): DataFrame = super.xml(path)
+
+ /** @inheritdoc */
+ override def orc(path: String): DataFrame = super.orc(path)
+
+ /** @inheritdoc */
+ override def parquet(path: String): DataFrame = super.parquet(path)
+
+ /** @inheritdoc */
+ override def text(path: String): DataFrame = super.text(path)
+
+ /** @inheritdoc */
+ override def textFile(path: String): Dataset[String] = super.textFile(path)
+
}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 16f6983efb18..c8776af18a14 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -304,7 +304,13 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.DataFrameReader.validateJsonSchema"),
ProblemFilters.exclude[DirectMissingMethodProblem](
- "org.apache.spark.sql.DataFrameReader.validateXmlSchema"))
+ "org.apache.spark.sql.DataFrameReader.validateXmlSchema"),
+
+ // Protected DataStreamReader methods...
+ ProblemFilters.exclude[DirectMissingMethodProblem](
+ "org.apache.spark.sql.streaming.DataStreamReader.validateJsonSchema"),
+ ProblemFilters.exclude[DirectMissingMethodProblem](
+ "org.apache.spark.sql.streaming.DataStreamReader.validateXmlSchema"))
checkMiMaCompatibility(clientJar, sqlJar, includedRules, excludeRules)
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 9a89ebb4797c..0bd0121e6e14 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -179,6 +179,7 @@ object MimaExcludes {
// SPARK-49282: Shared SparkSessionBuilder
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.SparkSession$Builder"),
) ++ loggingExcludes("org.apache.spark.sql.DataFrameReader") ++
+ loggingExcludes("org.apache.spark.sql.streaming.DataStreamReader") ++
loggingExcludes("org.apache.spark.sql.SparkSession#Builder")
// Default exclude rules
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/DataStreamReader.scala
similarity index 75%
copy from
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
copy to sql/api/src/main/scala/org/apache/spark/sql/api/DataStreamReader.scala
index 789425c9daea..219ecb77d403 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/DataStreamReader.scala
@@ -14,113 +14,94 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.spark.sql.streaming
+package org.apache.spark.sql.api
import scala.jdk.CollectionConverters._
+import _root_.java
+
import org.apache.spark.annotation.Evolving
-import org.apache.spark.connect.proto.Read.DataSource
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.Dataset
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
+import org.apache.spark.sql.{Encoders, Row}
import org.apache.spark.sql.types.StructType
/**
* Interface used to load a streaming `Dataset` from external storage systems
(e.g. file systems,
* key-value stores, etc). Use `SparkSession.readStream` to access this.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
@Evolving
-final class DataStreamReader private[sql] (sparkSession: SparkSession) extends
Logging {
+abstract class DataStreamReader {
/**
* Specifies the input data source format.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def format(source: String): DataStreamReader = {
- sourceBuilder.setFormat(source)
- this
- }
+ def format(source: String): this.type
/**
* Specifies the input schema. Some data sources (e.g. JSON) can infer the
input schema
* automatically from data. By specifying the schema here, the underlying
data source can skip
* the schema inference step, and thus speed up data loading.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def schema(schema: StructType): DataStreamReader = {
- if (schema != null) {
- sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail
all the attributes.
- }
- this
- }
+ def schema(schema: StructType): this.type
/**
* Specifies the schema by using the input DDL-formatted string. Some data
sources (e.g. JSON)
* can infer the input schema automatically from data. By specifying the
schema here, the
* underlying data source can skip the schema inference step, and thus speed
up data loading.
*
- * @since 3.5.0
+ * @since 2.3.0
*/
- def schema(schemaString: String): DataStreamReader = {
- sourceBuilder.setSchema(schemaString)
- this
+ def schema(schemaString: String): this.type = {
+ schema(StructType.fromDDL(schemaString))
}
/**
* Adds an input option for the underlying data source.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def option(key: String, value: String): DataStreamReader = {
- sourceBuilder.putOptions(key, value)
- this
- }
+ def option(key: String, value: String): this.type
/**
* Adds an input option for the underlying data source.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def option(key: String, value: Boolean): DataStreamReader = option(key,
value.toString)
+ def option(key: String, value: Boolean): this.type = option(key,
value.toString)
/**
* Adds an input option for the underlying data source.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def option(key: String, value: Long): DataStreamReader = option(key,
value.toString)
+ def option(key: String, value: Long): this.type = option(key, value.toString)
/**
* Adds an input option for the underlying data source.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def option(key: String, value: Double): DataStreamReader = option(key,
value.toString)
+ def option(key: String, value: Double): this.type = option(key,
value.toString)
/**
* (Scala-specific) Adds input options for the underlying data source.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def options(options: scala.collection.Map[String, String]): DataStreamReader
= {
- this.options(options.asJava)
- this
- }
+ def options(options: scala.collection.Map[String, String]): this.type
/**
* (Java-specific) Adds input options for the underlying data source.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def options(options: java.util.Map[String, String]): DataStreamReader = {
- sourceBuilder.putAllOptions(options)
+ def options(options: java.util.Map[String, String]): this.type = {
+ this.options(options.asScala)
this
}
@@ -128,26 +109,16 @@ final class DataStreamReader private[sql] (sparkSession:
SparkSession) extends L
* Loads input data stream in as a `DataFrame`, for data streams that don't
require a path (e.g.
* external key-value stores).
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def load(): DataFrame = {
- sparkSession.newDataFrame { relationBuilder =>
- relationBuilder.getReadBuilder
- .setIsStreaming(true)
- .setDataSource(sourceBuilder.build())
- }
- }
+ def load(): Dataset[Row]
/**
* Loads input in as a `DataFrame`, for data streams that read from some
path.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def load(path: String): DataFrame = {
- sourceBuilder.clearPaths()
- sourceBuilder.addPaths(path)
- load()
- }
+ def load(path: String): Dataset[Row]
/**
* Loads a JSON file stream and returns the results as a `DataFrame`.
@@ -167,9 +138,10 @@ final class DataStreamReader private[sql] (sparkSession:
SparkSession) extends L
*
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option">
* Data Source Option</a> in the version you use.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def json(path: String): DataFrame = {
+ def json(path: String): Dataset[Row] = {
+ validateJsonSchema()
format("json").load(path)
}
@@ -189,9 +161,9 @@ final class DataStreamReader private[sql] (sparkSession:
SparkSession) extends L
*
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option">
* Data Source Option</a> in the version you use.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def csv(path: String): DataFrame = format("csv").load(path)
+ def csv(path: String): Dataset[Row] = format("csv").load(path)
/**
* Loads a XML file stream and returns the result as a `DataFrame`.
@@ -211,7 +183,10 @@ final class DataStreamReader private[sql] (sparkSession:
SparkSession) extends L
*
* @since 4.0.0
*/
- def xml(path: String): DataFrame = format("xml").load(path)
+ def xml(path: String): Dataset[Row] = {
+ validateXmlSchema()
+ format("xml").load(path)
+ }
/**
* Loads a ORC file stream, returning the result as a `DataFrame`.
@@ -225,9 +200,11 @@ final class DataStreamReader private[sql] (sparkSession:
SparkSession) extends L
*
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option">
Data
* Source Option</a> in the version you use.
*
- * @since 3.5.0
+ * @since 2.3.0
*/
- def orc(path: String): DataFrame = format("orc").load(path)
+ def orc(path: String): Dataset[Row] = {
+ format("orc").load(path)
+ }
/**
* Loads a Parquet file stream, returning the result as a `DataFrame`.
@@ -241,27 +218,20 @@ final class DataStreamReader private[sql] (sparkSession:
SparkSession) extends L
*
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option">
Data
* Source Option</a> in the version you use.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def parquet(path: String): DataFrame = format("parquet").load(path)
+ def parquet(path: String): Dataset[Row] = {
+ format("parquet").load(path)
+ }
/**
* Define a Streaming DataFrame on a Table. The DataSource corresponding to
the table should
* support streaming mode.
* @param tableName
* The name of the table
- * @since 3.5.0
+ * @since 3.1.0
*/
- def table(tableName: String): DataFrame = {
- require(tableName != null, "The table name can't be null")
- sparkSession.newDataFrame { builder =>
- builder.getReadBuilder
- .setIsStreaming(true)
- .getNamedTableBuilder
- .setUnparsedIdentifier(tableName)
- .putAllOptions(sourceBuilder.getOptionsMap)
- }
- }
+ def table(tableName: String): Dataset[Row]
/**
* Loads text files and returns a `DataFrame` whose schema starts with a
string column named
@@ -286,9 +256,9 @@ final class DataStreamReader private[sql] (sparkSession:
SparkSession) extends L
*
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option">
* Data Source Option</a> in the version you use.
*
- * @since 3.5.0
+ * @since 2.0.0
*/
- def text(path: String): DataFrame = format("text").load(path)
+ def text(path: String): Dataset[Row] = format("text").load(path)
/**
* Loads text file(s) and returns a `Dataset` of String. The underlying
schema of the Dataset
@@ -311,11 +281,17 @@ final class DataStreamReader private[sql] (sparkSession:
SparkSession) extends L
*
* @param path
* input path
- * @since 3.5.0
+ * @since 2.1.0
*/
def textFile(path: String): Dataset[String] = {
- text(path).select("value").as[String](StringEncoder)
+ assertNoSpecifiedSchema("textFile")
+ text(path).select("value").as(Encoders.STRING)
}
- private val sourceBuilder = DataSource.newBuilder()
+ protected def assertNoSpecifiedSchema(operation: String): Unit
+
+ protected def validateJsonSchema(): Unit = ()
+
+ protected def validateXmlSchema(): Unit = ()
+
}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
index 2295c153cd51..0f73a94c3c4a 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -506,6 +506,17 @@ abstract class SparkSession extends Serializable with
Closeable {
*/
def read: DataFrameReader
+ /**
+ * Returns a `DataStreamReader` that can be used to read streaming data in
as a `DataFrame`.
+ * {{{
+ * sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
+ *
sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
+ * }}}
+ *
+ * @since 2.0.0
+ */
+ def readStream: DataStreamReader
+
/**
* (Scala-specific) Implicit methods available in Scala for converting
common Scala objects into
* `DataFrame`s.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index fe139d629eb2..983cc24718fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -739,15 +739,7 @@ class SparkSession private(
/** @inheritdoc */
def read: DataFrameReader = new DataFrameReader(self)
- /**
- * Returns a `DataStreamReader` that can be used to read streaming data in
as a `DataFrame`.
- * {{{
- * sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
- *
sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
- * }}}
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def readStream: DataStreamReader = new DataStreamReader(self)
// scalastyle:off
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 24d769fc8fc8..f42d8b667ab1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -22,12 +22,12 @@ import java.util.Locale
import scala.jdk.CollectionConverters._
import org.apache.spark.annotation.Evolving
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.{api, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CharVarcharUtils}
+import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -49,25 +49,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* @since 2.0.0
*/
@Evolving
-final class DataStreamReader private[sql](sparkSession: SparkSession) extends
Logging {
- /**
- * Specifies the input data source format.
- *
- * @since 2.0.0
- */
- def format(source: String): DataStreamReader = {
+final class DataStreamReader private[sql](sparkSession: SparkSession) extends
api.DataStreamReader {
+ /** @inheritdoc */
+ def format(source: String): this.type = {
this.source = source
this
}
- /**
- * Specifies the input schema. Some data sources (e.g. JSON) can infer the
input schema
- * automatically from data. By specifying the schema here, the underlying
data source can
- * skip the schema inference step, and thus speed up data loading.
- *
- * @since 2.0.0
- */
- def schema(schema: StructType): DataStreamReader = {
+ /** @inheritdoc */
+ def schema(schema: StructType): this.type = {
if (schema != null) {
val replaced =
CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
this.userSpecifiedSchema = Option(replaced)
@@ -75,75 +65,19 @@ final class DataStreamReader private[sql](sparkSession:
SparkSession) extends Lo
this
}
- /**
- * Specifies the schema by using the input DDL-formatted string. Some data
sources (e.g. JSON) can
- * infer the input schema automatically from data. By specifying the schema
here, the underlying
- * data source can skip the schema inference step, and thus speed up data
loading.
- *
- * @since 2.3.0
- */
- def schema(schemaString: String): DataStreamReader = {
- schema(StructType.fromDDL(schemaString))
- }
-
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 2.0.0
- */
- def option(key: String, value: String): DataStreamReader = {
+ /** @inheritdoc */
+ def option(key: String, value: String): this.type = {
this.extraOptions += (key -> value)
this
}
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 2.0.0
- */
- def option(key: String, value: Boolean): DataStreamReader = option(key,
value.toString)
-
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 2.0.0
- */
- def option(key: String, value: Long): DataStreamReader = option(key,
value.toString)
-
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 2.0.0
- */
- def option(key: String, value: Double): DataStreamReader = option(key,
value.toString)
-
- /**
- * (Scala-specific) Adds input options for the underlying data source.
- *
- * @since 2.0.0
- */
- def options(options: scala.collection.Map[String, String]): DataStreamReader
= {
+ /** @inheritdoc */
+ def options(options: scala.collection.Map[String, String]): this.type = {
this.extraOptions ++= options
this
}
- /**
- * (Java-specific) Adds input options for the underlying data source.
- *
- * @since 2.0.0
- */
- def options(options: java.util.Map[String, String]): DataStreamReader = {
- this.options(options.asScala)
- this
- }
-
-
- /**
- * Loads input data stream in as a `DataFrame`, for data streams that don't
require a path
- * (e.g. external key-value stores).
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def load(): DataFrame = loadInternal(None)
private def loadInternal(path: Option[String]): DataFrame = {
@@ -205,11 +139,7 @@ final class DataStreamReader private[sql](sparkSession:
SparkSession) extends Lo
}
}
- /**
- * Loads input in as a `DataFrame`, for data streams that read from some
path.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def load(path: String): DataFrame = {
if (!sparkSession.sessionState.conf.legacyPathOptionBehavior &&
extraOptions.contains("path")) {
@@ -218,133 +148,7 @@ final class DataStreamReader private[sql](sparkSession:
SparkSession) extends Lo
loadInternal(Some(path))
}
- /**
- * Loads a JSON file stream and returns the results as a `DataFrame`.
- *
- * <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON)
is supported by
- * default. For JSON (one record per file), set the `multiLine` option to
true.
- *
- * This function goes through the input once to determine the input schema.
If you know the
- * schema in advance, use the version that specifies the schema to avoid the
extra scan.
- *
- * You can set the following option(s):
- * <ul>
- * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number
of new files to be
- * considered in every trigger.</li>
- * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total
size of new files
- * to be considered in every trigger.</li>
- * </ul>
- *
- * You can find the JSON-specific options for reading JSON file stream in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 2.0.0
- */
- def json(path: String): DataFrame = {
- userSpecifiedSchema.foreach(checkJsonSchema)
- format("json").load(path)
- }
-
- /**
- * Loads a CSV file stream and returns the result as a `DataFrame`.
- *
- * This function will go through the input once to determine the input
schema if `inferSchema`
- * is enabled. To avoid going through the entire data once, disable
`inferSchema` option or
- * specify the schema explicitly using `schema`.
- *
- * You can set the following option(s):
- * <ul>
- * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number
of new files to be
- * considered in every trigger.</li>
- * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total
size of new files
- * to be considered in every trigger.</li>
- * </ul>
- *
- * You can find the CSV-specific options for reading CSV file stream in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 2.0.0
- */
- def csv(path: String): DataFrame = format("csv").load(path)
-
- /**
- * Loads a XML file stream and returns the result as a `DataFrame`.
- *
- * This function will go through the input once to determine the input
schema if `inferSchema`
- * is enabled. To avoid going through the entire data once, disable
`inferSchema` option or
- * specify the schema explicitly using `schema`.
- *
- * You can set the following option(s):
- * <ul>
- * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number
of new files to be
- * considered in every trigger.</li>
- * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total
size of new files
- * to be considered in every trigger.</li>
- * </ul>
- *
- * You can find the XML-specific options for reading XML file stream in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 4.0.0
- */
- def xml(path: String): DataFrame = {
- userSpecifiedSchema.foreach(checkXmlSchema)
- format("xml").load(path)
- }
-
- /**
- * Loads a ORC file stream, returning the result as a `DataFrame`.
- *
- * You can set the following option(s):
- * <ul>
- * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number
of new files to be
- * considered in every trigger.</li>
- * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total
size of new files
- * to be considered in every trigger.</li>
- * </ul>
- *
- * ORC-specific option(s) for reading ORC file stream can be found in
- * <a href=
- *
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 2.3.0
- */
- def orc(path: String): DataFrame = {
- format("orc").load(path)
- }
-
- /**
- * Loads a Parquet file stream, returning the result as a `DataFrame`.
- *
- * You can set the following option(s):
- * <ul>
- * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number
of new files to be
- * considered in every trigger.</li>
- * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total
size of new files
- * to be considered in every trigger.</li>
- * </ul>
- *
- * Parquet-specific option(s) for reading Parquet file stream can be found in
- * <a href=
- *
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 2.0.0
- */
- def parquet(path: String): DataFrame = {
- format("parquet").load(path)
- }
-
- /**
- * Define a Streaming DataFrame on a Table. The DataSource corresponding to
the table should
- * support streaming mode.
- * @param tableName The name of the table
- * @since 3.1.0
- */
+ /** @inheritdoc */
def table(tableName: String): DataFrame = {
require(tableName != null, "The table name can't be null")
val identifier =
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
@@ -356,65 +160,56 @@ final class DataStreamReader private[sql](sparkSession:
SparkSession) extends Lo
isStreaming = true))
}
- /**
- * Loads text files and returns a `DataFrame` whose schema starts with a
string column named
- * "value", and followed by partitioned columns if there are any.
- * The text files must be encoded as UTF-8.
- *
- * By default, each line in the text files is a new row in the resulting
DataFrame. For example:
- * {{{
- * // Scala:
- * spark.readStream.text("/path/to/directory/")
- *
- * // Java:
- * spark.readStream().text("/path/to/directory/")
- * }}}
- *
- * You can set the following option(s):
- * <ul>
- * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number
of new files to be
- * considered in every trigger.</li>
- * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total
size of new files
- * to be considered in every trigger.</li>
- * </ul>
- *
- * You can find the text-specific options for reading text files in
- * <a
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option">
- * Data Source Option</a> in the version you use.
- *
- * @since 2.0.0
- */
- def text(path: String): DataFrame = format("text").load(path)
-
- /**
- * Loads text file(s) and returns a `Dataset` of String. The underlying
schema of the Dataset
- * contains a single string column named "value".
- * The text files must be encoded as UTF-8.
- *
- * If the directory structure of the text files contains partitioning
information, those are
- * ignored in the resulting Dataset. To include partitioning information as
columns, use `text`.
- *
- * By default, each line in the text file is a new element in the resulting
Dataset. For example:
- * {{{
- * // Scala:
- * spark.readStream.textFile("/path/to/spark/README.md")
- *
- * // Java:
- * spark.readStream().textFile("/path/to/spark/README.md")
- * }}}
- *
- * You can set the text-specific options as specified in
`DataStreamReader.text`.
- *
- * @param path input path
- * @since 2.1.0
- */
- def textFile(path: String): Dataset[String] = {
+ override protected def assertNoSpecifiedSchema(operation: String): Unit = {
if (userSpecifiedSchema.nonEmpty) {
- throw
QueryCompilationErrors.userSpecifiedSchemaUnsupportedError("textFile")
+ throw
QueryCompilationErrors.userSpecifiedSchemaUnsupportedError(operation)
}
-
text(path).select("value").as[String](sparkSession.implicits.newStringEncoder)
}
+ override protected def validateJsonSchema(): Unit =
userSpecifiedSchema.foreach(checkJsonSchema)
+
+ override protected def validateXmlSchema(): Unit =
userSpecifiedSchema.foreach(checkXmlSchema)
+
+
///////////////////////////////////////////////////////////////////////////////////////
+ // Covariant overrides.
+
///////////////////////////////////////////////////////////////////////////////////////
+
+ /** @inheritdoc */
+ override def schema(schemaString: String): this.type =
super.schema(schemaString)
+
+ /** @inheritdoc */
+ override def option(key: String, value: Boolean): this.type =
super.option(key, value)
+
+ /** @inheritdoc */
+ override def option(key: String, value: Long): this.type = super.option(key,
value)
+
+ /** @inheritdoc */
+ override def option(key: String, value: Double): this.type =
super.option(key, value)
+
+ /** @inheritdoc */
+ override def options(options: java.util.Map[String, String]): this.type =
super.options(options)
+
+ /** @inheritdoc */
+ override def json(path: String): DataFrame = super.json(path)
+
+ /** @inheritdoc */
+ override def csv(path: String): DataFrame = super.csv(path)
+
+ /** @inheritdoc */
+ override def xml(path: String): DataFrame = super.xml(path)
+
+ /** @inheritdoc */
+ override def orc(path: String): DataFrame = super.orc(path)
+
+ /** @inheritdoc */
+ override def parquet(path: String): DataFrame = super.parquet(path)
+
+ /** @inheritdoc */
+ override def text(path: String): DataFrame = super.text(path)
+
+ /** @inheritdoc */
+ override def textFile(path: String): Dataset[String] = super.textFile(path)
+
///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]